Commit 7a60e044 authored by wanglch's avatar wanglch
Browse files

Initial commit

parents
Pipeline #1185 canceled with stages
from .gpt import OpenAIWrapper, GPT4V
from .gpt_int import OpenAIWrapperInternal, GPT4V_Internal
__all__ = [
'OpenAIWrapper', 'OpenAIWrapperInternal', 'GPT4V', 'GPT4V_Internal'
]
import time
import random as rd
from abc import abstractmethod
import os.path as osp
import copy as cp
from ..smp import get_logger, parse_file
class BaseAPI:
allowed_types = ['text', 'image']
INTERLEAVE = True
INSTALL_REQ = False
def __init__(self,
retry=10,
wait=3,
system_prompt=None,
verbose=True,
fail_msg='Failed to obtain answer via API.',
**kwargs):
"""Base Class for all APIs.
Args:
retry (int, optional): The retry times for `generate_inner`. Defaults to 10.
wait (int, optional): The wait time after each failed retry of `generate_inner`. Defaults to 3.
system_prompt (str, optional): Defaults to None.
verbose (bool, optional): Defaults to True.
fail_msg (str, optional): The message to return when failed to obtain answer.
Defaults to 'Failed to obtain answer via API.'.
**kwargs: Other kwargs for `generate_inner`.
"""
self.wait = wait
self.retry = retry
self.system_prompt = system_prompt
self.verbose = verbose
self.fail_msg = fail_msg
self.logger = get_logger('ChatAPI')
if len(kwargs):
self.logger.info(f'BaseAPI received the following kwargs: {kwargs}')
self.logger.info('Will try to use them as kwargs for `generate`. ')
self.default_kwargs = kwargs
@abstractmethod
def generate_inner(self, inputs, **kwargs):
"""The inner function to generate the answer.
Returns:
tuple(int, str, str): ret_code, response, log
"""
self.logger.warning('For APIBase, generate_inner is an abstract method. ')
assert 0, 'generate_inner not defined'
ret_code, answer, log = None, None, None
# if ret_code is 0, means succeed
return ret_code, answer, log
def working(self):
"""If the API model is working, return True, else return False.
Returns:
bool: If the API model is working, return True, else return False.
"""
retry = 3
while retry > 0:
ret = self.generate('hello')
if ret is not None and ret != '' and self.fail_msg not in ret:
return True
retry -= 1
return False
def check_content(self, msgs):
"""Check the content type of the input. Four types are allowed: str, dict, liststr, listdict.
Args:
msgs: Raw input messages.
Returns:
str: The message type.
"""
if isinstance(msgs, str):
return 'str'
if isinstance(msgs, dict):
return 'dict'
if isinstance(msgs, list):
types = [self.check_content(m) for m in msgs]
if all(t == 'str' for t in types):
return 'liststr'
if all(t == 'dict' for t in types):
return 'listdict'
return 'unknown'
def preproc_content(self, inputs):
"""Convert the raw input messages to a list of dicts.
Args:
inputs: raw input messages.
Returns:
list(dict): The preprocessed input messages. Will return None if failed to preprocess the input.
"""
if self.check_content(inputs) == 'str':
return [dict(type='text', value=inputs)]
elif self.check_content(inputs) == 'dict':
assert 'type' in inputs and 'value' in inputs
return [inputs]
elif self.check_content(inputs) == 'liststr':
res = []
for s in inputs:
mime, pth = parse_file(s)
if mime is None or mime == 'unknown':
res.append(dict(type='text', value=s))
else:
res.append(dict(type=mime.split('/')[0], value=pth))
return res
elif self.check_content(inputs) == 'listdict':
for item in inputs:
assert 'type' in item and 'value' in item
mime, s = parse_file(item['value'])
if mime is None:
assert item['type'] == 'text', item['value']
else:
assert mime.split('/')[0] == item['type']
item['value'] = s
return inputs
else:
return None
def generate(self, message, **kwargs1):
"""The main function to generate the answer. Will call `generate_inner` with the preprocessed input messages.
Args:
message: raw input messages.
Returns:
str: The generated answer of the Failed Message if failed to obtain answer.
"""
assert self.check_content(message) in ['str', 'dict', 'liststr', 'listdict'], f'Invalid input type: {message}'
message = self.preproc_content(message)
assert message is not None and self.check_content(message) == 'listdict'
for item in message:
assert item['type'] in self.allowed_types, f'Invalid input type: {item["type"]}'
# merge kwargs
kwargs = cp.deepcopy(self.default_kwargs)
kwargs.update(kwargs1)
answer = None
# a very small random delay [0s - 0.5s]
T = rd.random() * 0.5
time.sleep(T)
for i in range(self.retry):
try:
ret_code, answer, log = self.generate_inner(message, **kwargs)
if ret_code == 0 and self.fail_msg not in answer and answer != '':
if self.verbose:
print(answer)
return answer
elif self.verbose:
if not isinstance(log, str):
try:
log = log.text
except:
self.logger.warning(f'Failed to parse {log} as an http response. ')
self.logger.info(f'RetCode: {ret_code}\nAnswer: {answer}\nLog: {log}')
except Exception as err:
if self.verbose:
self.logger.error(f'An error occured during try {i}:')
self.logger.error(err)
# delay before each retry
T = rd.random() * self.wait * 2
time.sleep(T)
return self.fail_msg if answer in ['', None] else answer
def message_to_promptimg(self, message):
assert not self.INTERLEAVE
model_name = self.__class__.__name__
import warnings
warnings.warn(
f'Model {model_name} does not support interleaved input. '
'Will use the first image and aggregated texts as prompt. ')
num_images = len([x for x in message if x['type'] == 'image'])
if num_images == 0:
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
image = None
elif num_images == 1:
prompt = '\n'.join([x['value'] for x in message if x['type'] == 'text'])
image = [x['value'] for x in message if x['type'] == 'image'][0]
else:
prompt = '\n'.join([x['value'] if x['type'] == 'text' else '<image>' for x in message])
image = [x['value'] for x in message if x['type'] == 'image'][0]
return prompt, image
from ..smp import *
import os
import sys
from .base import BaseAPI
APIBASES = {
'OFFICIAL': 'https://api.openai.com/v1/chat/completions',
}
def GPT_context_window(model):
length_map = {
'gpt-4-1106-preview': 128000,
'gpt-4-vision-preview': 128000,
'gpt-4': 8192,
'gpt-4-32k': 32768,
'gpt-4-0613': 8192,
'gpt-4-32k-0613': 32768,
'gpt-3.5-turbo-1106': 16385,
'gpt-3.5-turbo': 4096,
'gpt-3.5-turbo-16k': 16385,
'gpt-3.5-turbo-instruct': 4096,
'gpt-3.5-turbo-0613': 4096,
'gpt-3.5-turbo-16k-0613': 16385,
}
if model in length_map:
return length_map[model]
else:
return 128000
class OpenAIWrapper(BaseAPI):
is_api: bool = True
def __init__(self,
model: str = 'gpt-3.5-turbo-0613',
retry: int = 5,
wait: int = 5,
key: str = None,
verbose: bool = True,
system_prompt: str = None,
temperature: float = 0,
timeout: int = 60,
api_base: str = None,
max_tokens: int = 1024,
img_size: int = 512,
img_detail: str = 'low',
**kwargs):
self.model = model
self.cur_idx = 0
self.fail_msg = 'Failed to obtain answer via API. '
self.max_tokens = max_tokens
self.temperature = temperature
if 'step-1v' in model:
env_key = os.environ.get('STEPAI_API_KEY', '')
if key is None:
key = env_key
else:
env_key = os.environ.get('OPENAI_API_KEY', '')
if key is None:
key = env_key
assert isinstance(key, str) and key.startswith('sk-'), (
f'Illegal openai_key {key}. '
'Please set the environment variable OPENAI_API_KEY to your openai key. '
)
self.key = key
assert img_size > 0 or img_size == -1
self.img_size = img_size
assert img_detail in ['high', 'low']
self.img_detail = img_detail
self.timeout = timeout
super().__init__(wait=wait, retry=retry, system_prompt=system_prompt, verbose=verbose, **kwargs)
if api_base is None:
if 'OPENAI_API_BASE' in os.environ and os.environ['OPENAI_API_BASE'] != '':
self.logger.error('Environment variable OPENAI_API_BASE is set. Will use it as api_base. ')
api_base = os.environ['OPENAI_API_BASE']
else:
api_base = 'OFFICIAL'
assert api_base is not None
if api_base in APIBASES:
self.api_base = APIBASES[api_base]
elif api_base.startswith('http'):
self.api_base = api_base
else:
self.logger.error('Unknown API Base. ')
sys.exit(-1)
self.logger.info(f'Using API Base: {self.api_base}; API Key: {self.key}')
# inputs can be a lvl-2 nested list: [content1, content2, content3, ...]
# content can be a string or a list of image & text
def prepare_inputs(self, inputs):
input_msgs = []
if self.system_prompt is not None:
input_msgs.append(dict(role='system', content=self.system_prompt))
has_images = np.sum([x['type'] == 'image' for x in inputs])
if has_images:
content_list = []
for msg in inputs:
if msg['type'] == 'text':
content_list.append(dict(type='text', text=msg['value']))
elif msg['type'] == 'image':
from PIL import Image
img = Image.open(msg['value'])
b64 = encode_image_to_base64(img, target_size=self.img_size)
img_struct = dict(url=f'data:image/jpeg;base64,{b64}', detail=self.img_detail)
content_list.append(dict(type='image_url', image_url=img_struct))
input_msgs.append(dict(role='user', content=content_list))
else:
assert all([x['type'] == 'text' for x in inputs])
text = '\n'.join([x['value'] for x in inputs])
input_msgs.append(dict(role='user', content=text))
return input_msgs
def generate_inner(self, inputs, **kwargs) -> str:
input_msgs = self.prepare_inputs(inputs)
temperature = kwargs.pop('temperature', self.temperature)
max_tokens = kwargs.pop('max_tokens', self.max_tokens)
context_window = GPT_context_window(self.model)
max_tokens = min(max_tokens, context_window - self.get_token_len(inputs))
if 0 < max_tokens <= 100:
self.logger.warning(
'Less than 100 tokens left, '
'may exceed the context window with some additional meta symbols. '
)
if max_tokens <= 0:
return 0, self.fail_msg + 'Input string longer than context window. ', 'Length Exceeded. '
headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {self.key}'}
payload = dict(
model=self.model,
messages=input_msgs,
max_tokens=max_tokens,
n=1,
temperature=temperature,
**kwargs)
response = requests.post(self.api_base, headers=headers, data=json.dumps(payload), timeout=self.timeout * 1.1)
ret_code = response.status_code
ret_code = 0 if (200 <= int(ret_code) < 300) else ret_code
answer = self.fail_msg
try:
resp_struct = json.loads(response.text)
answer = resp_struct['choices'][0]['message']['content'].strip()
except:
pass
return ret_code, answer, response
def get_token_len(self, inputs) -> int:
import tiktoken
try:
enc = tiktoken.encoding_for_model(self.model)
except:
enc = tiktoken.encoding_for_model('gpt-4')
assert isinstance(inputs, list)
tot = 0
for item in inputs:
if item['type'] == 'text':
tot += len(enc.encode(item['value']))
elif item['type'] == 'image':
tot += 85
if self.img_detail == 'high':
img = Image.open(item['value'])
npatch = np.ceil(img.size[0] / 512) * np.ceil(img.size[1] / 512)
tot += npatch * 170
return tot
class GPT4V(OpenAIWrapper):
def generate(self, message, dataset=None):
return super(GPT4V, self).generate(message)
\ No newline at end of file
import json
import warnings
import requests
from ..smp import *
from .gpt import GPT_context_window, OpenAIWrapper
url = 'http://ecs.sv.us.alles-apin.openxlab.org.cn/v1/openai/v2/text/chat'
headers = {
'Content-Type': 'application/json'
}
class OpenAIWrapperInternal(OpenAIWrapper):
is_api: bool = True
def __init__(self,
model: str = 'gpt-3.5-turbo-0613',
retry: int = 5,
wait: int = 3,
verbose: bool = True,
system_prompt: str = None,
temperature: float = 0,
timeout: int = 60,
max_tokens: int = 1024,
img_size: int = 512,
img_detail: str = 'low',
**kwargs):
self.model = model
if 'KEYS' in os.environ and osp.exists(os.environ['KEYS']):
keys = load(os.environ['KEYS'])
headers['alles-apin-token'] = keys.get('alles-apin-token', '')
elif 'ALLES' in os.environ:
headers['alles-apin-token'] = os.environ['ALLES']
self.headers = headers
self.temperature = temperature
self.timeout = timeout
self.max_tokens = max_tokens
assert img_size > 0 or img_size == -1
self.img_size = img_size
assert img_detail in ['high', 'low']
self.img_detail = img_detail
super(OpenAIWrapper, self).__init__(
wait=wait, retry=retry, system_prompt=system_prompt, verbose=verbose, **kwargs)
def generate_inner(self, inputs, **kwargs) -> str:
input_msgs = self.prepare_inputs(inputs)
temperature = kwargs.pop('temperature', self.temperature)
max_tokens = kwargs.pop('max_tokens', self.max_tokens)
# Held out 100 tokens as buffer
context_window = GPT_context_window(self.model)
max_tokens = min(max_tokens, context_window - self.get_token_len(inputs))
if 0 < max_tokens <= 100:
print('Less than 100 tokens left, may exceed the context window with some additional meta symbols. ')
if max_tokens <= 0:
return 0, self.fail_msg + 'Input string longer than context window. ', 'Length Exceeded. '
payload = dict(
model=self.model,
messages=input_msgs,
max_tokens=max_tokens,
n=1,
stop=None,
timeout=self.timeout,
temperature=temperature,
**kwargs)
response = requests.post(url, headers=headers, data=json.dumps(payload), timeout=self.timeout * 1.1)
ret_code = response.status_code
ret_code = 0 if (200 <= int(ret_code) < 300) else ret_code
answer = self.fail_msg
try:
resp_struct = json.loads(response.text)
assert resp_struct['msg'] == 'ok' and resp_struct['msgCode'] == '10000', resp_struct
answer = resp_struct['data']['choices'][0]['message']['content'].strip()
except:
pass
return ret_code, answer, response
class GPT4V_Internal(OpenAIWrapperInternal):
def generate(self, message, dataset=None):
return super(GPT4V_Internal, self).generate(message)
from vlmeval.vlm import *
from vlmeval.api import *
from functools import partial
ungrouped = {
'MiniCPM-V':partial(MiniCPM_V, model_path='openbmb/MiniCPM-V'),
'MiniCPM-V-2':partial(MiniCPM_V, model_path='openbmb/MiniCPM-V-2'),
'MiniCPM-Llama3-V-2_5':partial(MiniCPM_Llama3_V, model_path='openbmb/MiniCPM-Llama3-V-2_5'),
}
supported_VLM = {}
model_groups = [
ungrouped
]
for grp in model_groups:
supported_VLM.update(grp)
from vlmeval.smp import *
def OCRBench_eval(eval_file):
OCRBench_score = {
'Regular Text Recognition': 0,
'Irregular Text Recognition': 0,
'Artistic Text Recognition': 0,
'Handwriting Recognition': 0,
'Digit String Recognition': 0,
'Non-Semantic Text Recognition': 0,
'Scene Text-centric VQA': 0,
'Doc-oriented VQA': 0,
'Key Information Extraction': 0,
'Handwritten Mathematical Expression Recognition': 0
}
logger = get_logger('Evaluation')
data = load(eval_file)
lt = len(data)
lines = [data.iloc[i] for i in range(lt)]
for i in tqdm(range(len(lines))):
line = lines[i]
predict = str(line['prediction'])
answers = eval(line['answer'])
category = line['category']
if category == 'Handwritten Mathematical Expression Recognition':
for j in range(len(answers)):
answer = answers[j].strip().replace('\n', ' ').replace(' ', '')
predict = predict.strip().replace('\n', ' ').replace(' ', '')
if answer in predict:
OCRBench_score[category] += 1
break
else:
for j in range(len(answers)):
answer = answers[j].lower().strip().replace('\n', ' ')
predict = predict.lower().strip().replace('\n', ' ')
if answer in predict:
OCRBench_score[category] += 1
break
final_score_dict = {}
final_score_dict['Text Recognition'] = (
OCRBench_score['Regular Text Recognition'] + OCRBench_score['Irregular Text Recognition']
+ OCRBench_score['Artistic Text Recognition'] + OCRBench_score['Handwriting Recognition']
+ OCRBench_score['Digit String Recognition'] + OCRBench_score['Non-Semantic Text Recognition']
)
final_score_dict['Scene Text-centric VQA'] = OCRBench_score['Scene Text-centric VQA']
final_score_dict['Doc-oriented VQA'] = OCRBench_score['Doc-oriented VQA']
final_score_dict['Key Information Extraction'] = OCRBench_score['Key Information Extraction']
final_score_dict['Handwritten Mathematical Expression Recognition'] = \
OCRBench_score['Handwritten Mathematical Expression Recognition']
final_score_dict['Final Score'] = (
final_score_dict['Text Recognition'] + final_score_dict['Scene Text-centric VQA']
+ final_score_dict['Doc-oriented VQA'] + final_score_dict['Key Information Extraction']
+ final_score_dict['Handwritten Mathematical Expression Recognition']
)
final_score_dict['Final Score Norm'] = float(final_score_dict['Final Score']) / 10
score_pth = eval_file.replace('.xlsx', '_score.json')
dump(final_score_dict, score_pth)
logger.info(f'OCRBench_eval successfully finished evaluating {eval_file}, results saved in {score_pth}')
logger.info('Score: ')
for key, value in final_score_dict.items():
logger.info('{}:{}'.format(key, value))
from .yes_or_no import default_rating, MME_rating, YOrN_eval
from .mmvet_eval import MMVet_eval
from .multiple_choice import multiple_choice_eval
from .coco_eval import COCO_eval
from .vqa_eval import VQAEval
from .mathvista_eval import MathVista_eval
from .llavabench import LLaVABench_eval
from .misc import build_judge
from .OCRBench import OCRBench_eval
from vlmeval.smp import *
from pycocoevalcap.bleu.bleu import Bleu
from pycocoevalcap.rouge.rouge import Rouge
from pycocoevalcap.cider.cider import Cider
class COCO_Caption_Scorer():
def __init__(self, ref, gt):
self.ref = ref
self.gt = gt
print('setting up scorers...')
self.scorers = [
(Bleu(4), ['Bleu_1', 'Bleu_2', 'Bleu_3', 'Bleu_4']),
# (Meteor(), "METEOR"), # need java version 11.0.16+
(Rouge(), 'ROUGE_L'),
(Cider(), 'CIDEr'),
# (Spice(), "SPICE"), # need java version 11.0.16+
]
def compute_scores(self):
total_scores = {}
for scorer, method in self.scorers:
print('computing %s score...' % (scorer.method()))
score, scores = scorer.compute_score(self.gt, self.ref)
if type(method) == list:
for sc, scs, m in zip(score, scores, method):
print('%s: %0.3f' % (m, sc * 100))
total_scores['Bleu'] = [x * 100 for x in score]
else:
print('%s: %0.3f' % (method, score * 100))
total_scores[method] = score * 100
print('*****DONE*****')
for key, value in total_scores.items():
print('{}:{}'.format(key, value))
return total_scores
def COCO_eval(eval_file, nproc=4, verbose=False):
logger = get_logger('Evaluation')
data = load(eval_file)
lt = len(data)
lines = [data.iloc[i] for i in range(lt)]
ref = {}
gt = {}
for i, line in enumerate(lines):
ref[str(i)] = [str(line['prediction'])]
gt[str(i)] = eval(line['answer'])
scorer = COCO_Caption_Scorer(ref, gt)
coco_caption_score_dict = scorer.compute_scores()
score_pth = eval_file.replace('.xlsx', '_score.json')
dump(coco_caption_score_dict, score_pth)
logger.info(f'COCO_eval successfully finished evaluating {eval_file}, results saved in {score_pth}')
logger.info('Score: ')
for key, value in coco_caption_score_dict.items():
logger.info('{}:{}'.format(key, value))
def parse_args():
parser = argparse.ArgumentParser(description='Inference LLM Answers. ')
parser.add_argument('--data', type=str, help='The question set for inference, in excel / tsv / json format. ')
parser.add_argument('--nproc', type=int, default=4)
parser.add_argument('--verbose', action='store_true')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = parse_args()
COCO_eval(eval_file=args.data, nproc=args.nproc, verbose=args.verbose)
import argparse
import numpy as np
import pandas as pd
import os.path as osp
from vlmeval.evaluate.misc import build_judge
from vlmeval.smp import *
from vlmeval.utils import track_progress_rich
rule_dict = {
'llava_bench_conv': {'role': 'Assistant', 'prompt': 'We would like to request your feedback on the performance of two AI assistants in response to the user question displayed above. The user asks the question on observing an image. For your reference, the visual content in the image is represented with a few sentences describing the image. \nPlease rate the helpfulness, relevance, accuracy, level of details of their responses. Each assistant receives an overall score on a scale of 1 to 10, where a higher score indicates better overall performance.\nPlease first output a single line containing only two values indicating the scores for Assistant 1 and 2, respectively. The two scores are separated by a space.\nIn the subsequent line, please provide a comprehensive explanation of your evaluation, avoiding any potential bias and ensuring that the order in which the responses were presented does not affect your judgment.'}, # noqa: E501
'llava_bench_detail': {'role': 'Assistant', 'prompt': 'We would like to request your feedback on the performance of two AI assistants in response to the user question displayed above. The user asks the question on observing an image. For your reference, the visual content in the image is represented with a few sentences describing the image. \nPlease rate the helpfulness, relevance, accuracy, level of details of their responses. Each assistant receives an overall score on a scale of 1 to 10, where a higher score indicates better overall performance.\nPlease first output a single line containing only two values indicating the scores for Assistant 1 and 2, respectively. The two scores are separated by a space.\nIn the subsequent line, please provide a comprehensive explanation of your evaluation, avoiding any potential bias and ensuring that the order in which the responses were presented does not affect your judgment.'}, # noqa: E501
'llava_bench_complex': {'role': 'Assistant', 'prompt': 'We would like to request your feedback on the performance of two AI assistants in response to the user question displayed above. The user asks the question on observing an image. For your reference, the visual content in the image is represented with a few sentences describing the image. \nPlease rate the helpfulness, relevance, accuracy, level of details of their responses. Each assistant receives an overall score on a scale of 1 to 10, where a higher score indicates better overall performance.\nPlease first output a single line containing only two values indicating the scores for Assistant 1 and 2, respectively. The two scores are separated by a space.\nIn the subsequent line, please provide a comprehensive explanation of your evaluation, avoiding any potential bias and ensuring that the order in which the responses were presented does not affect your judgment.'} # noqa: E501
}
def get_eval(judge, content):
return judge.generate(content)
def parse_score(review):
logger = get_logger('Evaluation')
try:
score_pair = review.split('\n')[0]
score_pair = score_pair.replace(',', ' ')
sp = score_pair.split(' ')
if len(sp) == 2:
return [float(sp[0]), float(sp[1])]
else:
logger.error('error', review)
return [-1, -1]
except Exception as e:
logger.error(e, 'error', review)
return [-1, -1]
def build_prompt(line):
cap_str = line['caption']
question = line['question']
ans1 = line['gpt4_ans']
ans2 = line['prediction']
category = 'llava_bench_' + line['category']
rule = rule_dict[category]
role, prompt = rule['role'], rule['prompt']
content = (f'[Context]\n{cap_str}\n\n'
f'[Question]\n{question}\n\n'
f'[{role} 1]\n{ans1}\n\n[End of {role} 1]\n\n'
f'[{role} 2]\n{ans2}\n\n[End of {role} 2]\n\n'
f'[System]\n{prompt}\n\n')
return content
def LLaVABench_atomeval(model, prompt):
review = get_eval(model, prompt)
scores = parse_score(review)
return scores
def LLaVABench_score(data):
cates = ['overall'] + list(set(data['category']))
ret = defaultdict(list)
for c in cates:
ret['split'].append(c)
sub = data[data['category'] == c] if c != 'overall' else data
ret['Relative Score (main)'].append(np.mean(sub['score']) / np.mean(sub['gpt4_score']) * 100)
ret['VLM Score'].append(np.mean(sub['score']) * 10)
ret['GPT4 Score'].append(np.mean(sub['gpt4_score']) * 10)
return pd.DataFrame(ret)
def LLaVABench_eval(eval_file, **judge_kwargs):
suffix = '.' + eval_file.split('.')[-1]
record_file = eval_file.replace(suffix, '_openai_result' + suffix)
score_file = eval_file.replace(suffix, '_score.csv')
nproc = judge_kwargs.pop('nproc', 4)
if not osp.exists(record_file):
data = load(eval_file)
lines = [data.iloc[i] for i in range(len(data))]
model = build_judge(
temperature=0.2,
system_prompt='You are a helpful and precise assistant for checking the quality of the answer.',
**judge_kwargs)
prompts = [build_prompt(line) for line in lines]
tups = [(model, prompt) for prompt in prompts]
scores = track_progress_rich(LLaVABench_atomeval, tups, nproc=nproc, chunksize=nproc)
data['gpt4_score'] = [x[0] for x in scores]
data['score'] = [x[1] for x in scores]
dump(data, record_file)
data = load(record_file)
ret = LLaVABench_score(data).round(1)
print(ret)
dump(ret, score_file)
return ret
def parse_args():
parser = argparse.ArgumentParser(description='LLaVABench Evaluation. ')
parser.add_argument('data', type=str, help='The question set for inference, in excel / tsv / json format. ')
parser.add_argument(
'--model', type=str, help='The LLM (GPT) used for inference. ', default='gpt-4-turbo',
choices=['gpt-4-0613', 'gpt-4-turbo', 'chatgpt-1106', 'chatgpt-0613', 'gpt-4-0314'])
parser.add_argument('--nproc', type=int, default=4)
parser.add_argument('--verbose', action='store_true')
args = parser.parse_args()
return args
if __name__ == '__main__':
load_env()
args = parse_args()
judge_kwargs = dict(model=args.model, nproc=args.nproc, verbose=args.verbose)
if 'OPENAI_API_KEY_JUDGE' in os.environ and os.environ['OPENAI_API_KEY_JUDGE']:
judge_kwargs['key'] = os.environ['OPENAI_API_KEY_JUDGE']
if 'OPENAI_API_BASE_JUDGE' in os.environ and os.environ['OPENAI_API_BASE_JUDGE']:
judge_kwargs['api_base'] = os.environ['OPENAI_API_BASE_JUDGE']
LLaVABench_eval(eval_file=args.data, **judge_kwargs)
from vlmeval.evaluate.misc import build_judge
from vlmeval.smp import *
from vlmeval.utils import track_progress_rich
from vlmeval.utils.matching_util import can_infer
def get_gpt4_ICE():
example_1 = """
Hint: Please answer the question requiring an integer answer and provide the final value,
e.g., 1, 2, 3, at the end.\n
Question: Which number is missing?\n
Model response: The number missing in the sequence is 14.\n
Extracted answer: 14
"""
example_2 = """
Hint: Please answer the question requiring a floating-point number with one decimal place and provide the final value,
e.g., 1.2, 1.3, 1.4, at the end.\n
Question: What is the fraction of females facing the camera?\n
Model response: The fraction of females facing the camera is 0.6,
which means that six out of ten females in the group are facing the camera.\n
Extracted answer: 0.6
"""
example_3 = """
Hint: Please answer the question requiring a floating-point number with two decimal places and provide the final value,
e.g., 1.23, 1.34, 1.45, at the end.\n
Question: How much money does Luca need to buy a sour apple candy and a butter-scotch candy? (Unit: $)\n
Model response: Luca needs $1.45 to buy a sour apple candy and a butterscotch candy.\n
Extracted answer: 1.45
"""
example_4 = """
Hint: Please answer the question requiring a Python list as an answer and provide the final list,
e.g., [1, 2, 3], [1.2, 1.3, 1.4], at the end.\n
Question: Between which two years does the line graph saw its maximum peak?\n
Model response: The line graph saw its maximum peak between 2007 and 2008.\n
Extracted answer: [2007, 2008]
"""
example_5 = """
Hint: Please answer the question and provide the correct option letter, e.g., A, B, C, D, at the end.\n
Question: What fraction of the shape is blue?\n
Choices: (A) 3/11 (B) 8/11 (C) 6/11 (D) 3/5\n
Model response: The correct answer is (B) 8/11.\n
Extracted answer: B
"""
return [example_1, example_2, example_3, example_4, example_5]
def build_mathvista_gpt4_prompt(line):
task_description = """
Please read the following example.
Then extract the answer from the model response and type it at the end of the prompt.\n
"""
question = line['question']
prediction = str(line['prediction'])
prompt = task_description
examples = get_gpt4_ICE()
for example in examples:
prompt += example + '\n'
prompt += question + '\n'
prompt += 'Model respone: ' + prediction
prompt += 'Extracted answer:'
return prompt
def list_to_dict(lst):
return {chr(65 + i): val for i, val in enumerate(lst)}
def post_check(line, prefetch=False):
res = None
ans = line['answer']
response = line['prediction'] if prefetch else line['res']
try:
if line['question_type'] == 'multi_choice':
ans = line['answer_option']
choices = list_to_dict(eval(line['choices']))
res = can_infer(response, choices)
if prefetch:
return res
else:
if line['answer_type'] == 'integer':
res = int(response)
ans = int(line['answer'])
elif line['answer_type'] == 'float':
res = float(response)
ans = float(line['answer'])
else:
res = str(res)
ans = str(ans)
except ValueError:
pass
if res == ans:
return res if prefetch else True
else:
return False
def MathVista_auxeval(model, line):
prompt = build_mathvista_gpt4_prompt(line)
log = ''
retry = 5
if post_check(line, prefetch=True):
res = post_check(line, prefetch=True)
return dict(log='Prefetch succeed', res=res)
for i in range(retry):
prediction = line['prediction']
res = model.generate(prompt, temperature=i * 0.5)
if res is None:
log += f'Try {i}: output is {prediction}, failed to parse.\n'
else:
log += 'Succeed'
return dict(log=log, res=res)
log += 'All 5 retries failed.\n'
return dict(log=log, res='')
def MathVista_acc(result_file):
data = load(result_file)
tot = defaultdict(lambda: 0)
fetch = defaultdict(lambda: 0)
hit = defaultdict(lambda: 0)
lt = len(data)
skill_list = []
for i in range(lt):
item = data.iloc[i]
cate = item['task']
tot['Overall'] += 1
try:
skills = eval(item['skills'])
except SyntaxError:
skills = [item['skills']]
for skill in skills:
if skill not in skill_list:
skill_list.append(skill)
tot[skill] += 1
tot[cate] += 1
if item['log'] == 'Prefetch succeed':
fetch['Overall'] += 1
fetch[cate] += 1
for skill in skills:
fetch[skill] += 1
if post_check(item, prefetch=False):
hit['Overall'] += 1
hit[cate] += 1
for skill in skills:
hit[skill] += 1
res = defaultdict(list)
for k in tot.keys():
res['Task&Skill'].append(k)
res['tot'].append(tot[k])
res['prefetch'].append(fetch[k])
res['hit'].append(hit[k])
res['prefetch_rate'].append(fetch[k] / tot[k] * 100)
res['acc'].append(hit[k] / tot[k] * 100)
res = pd.DataFrame(res)
return res
def MathVista_eval(eval_file, **judge_kwargs):
logger = get_logger('Evaluation')
model = judge_kwargs['model']
suffix = eval_file.split('.')[-1]
storage = eval_file.replace(f'.{suffix}', f'_{model}.xlsx')
tmp_file = eval_file.replace(f'.{suffix}', f'_{model}.pkl')
nproc = judge_kwargs.pop('nproc', 4)
if osp.exists(storage):
logger.warning(f'GPT scoring file {storage} already exists, will reuse it in MathVista_eval. ')
else:
data = load(eval_file)
model = build_judge(max_tokens=128, **judge_kwargs)
lt = len(data)
lines = [data.iloc[i] for i in range(lt)]
tups = [(model, line) for line in lines]
indices = [line['index'] for line in lines]
ans = {}
if osp.exists(tmp_file):
ans = load(tmp_file)
tups = [x for x, i in zip(tups, indices) if i not in ans]
indices = [i for i in indices if i not in ans]
if len(indices):
new_results = track_progress_rich(
MathVista_auxeval, tups, nproc=nproc, chunksize=nproc,
keys=indices, save=tmp_file)
ans = load(tmp_file)
for k, v in zip(indices, new_results):
assert k in ans
assert ans[k]['log'] == v['log'] and ans[k]['res'] == v['res']
log_map, res_map = {}, {}
all_inds = [line['index'] for line in lines]
for k in all_inds:
log_map[k] = ans[k]['log']
res_map[k] = ans[k]['res']
data['res'] = [res_map[idx] for idx in data['index']]
data['log'] = [log_map[idx] for idx in data['index']]
dump(data, storage)
score = MathVista_acc(storage)
score_pth = storage.replace('.xlsx', '_score.csv')
dump(score, score_pth)
logger.info(f'MathVista_eval successfully finished evaluating {eval_file}, results saved in {score_pth}')
logger.info('Score: ')
logger.info(score)
def parse_args():
parser = argparse.ArgumentParser(description='Inference LLM Answers. ')
parser.add_argument('data', type=str, help='The question set for inference, in excel / tsv / json format. ')
parser.add_argument(
'--model',
type=str,
help='The LLM (GPT) used for inference. ',
default='gpt-4-turbo',
choices=['gpt-4-0613', 'gpt-4-turbo', 'chatgpt-1106', 'chatgpt-0613'])
parser.add_argument('--nproc', type=int, default=4)
parser.add_argument('--verbose', action='store_true')
args = parser.parse_args()
return args
if __name__ == '__main__':
load_env()
args = parse_args()
judge_kwargs = dict(model=args.model, nproc=args.nproc, verbose=args.verbose)
if 'OPENAI_API_KEY_JUDGE' in os.environ and os.environ['OPENAI_API_KEY_JUDGE']:
judge_kwargs['key'] = os.environ['OPENAI_API_KEY_JUDGE']
if 'OPENAI_API_BASE_JUDGE' in os.environ and os.environ['OPENAI_API_BASE_JUDGE']:
judge_kwargs['api_base'] = os.environ['OPENAI_API_BASE_JUDGE']
MathVista_eval(eval_file=args.data, **judge_kwargs)
import os
from vlmeval.api import OpenAIWrapper, OpenAIWrapperInternal
from vlmeval.smp import load_env
INTERNAL = os.environ.get('INTERNAL', 0)
def build_judge(**kwargs):
model = kwargs.pop('model', None)
load_env()
LOCAL_LLM = os.environ.get('LOCAL_LLM', None)
if LOCAL_LLM is None:
model_map = {
'gpt-4-turbo': 'gpt-4-1106-preview',
'gpt-4-0613': 'gpt-4-0613',
'gpt-4-0314': 'gpt-4-0314',
'gpt-4-0125': 'gpt-4-0125-preview',
'chatgpt-1106': 'gpt-3.5-turbo-1106',
'chatgpt-0613': 'gpt-3.5-turbo-0613',
'chatgpt-0125': 'gpt-3.5-turbo-0125'
}
model_version = model_map[model]
else:
model_version = LOCAL_LLM
if INTERNAL:
model = OpenAIWrapperInternal(model_version, **kwargs)
else:
model = OpenAIWrapper(model_version, **kwargs)
return model
from vlmeval.evaluate.misc import build_judge
from vlmeval.smp import *
from vlmeval.utils import track_progress_rich
def build_mmvet_gpt4_prompt(line):
question = line['question']
gt = str(line['answer'])
prediction = str(line['prediction'])
prompt = """
Compare the ground truth and prediction from AI models, to give a correctness score for the prediction.
<AND> in the ground truth means it is totally right
only when all elements in the ground truth are present in the prediction,
and <OR> means it is totally right when any one element in the ground truth is present in the prediction.
The correctness score is 0.0 (totally wrong), 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, or 1.0 (totally right).
Just complete the last space of the correctness score.
Question | Ground truth | Prediction | Correctness
--- | --- | --- | ---
What is x in the equation? | -1 <AND> -5 | x = 3 | 0.0
What is x in the equation? | -1 <AND> -5 | x = -1 | 0.5
What is x in the equation? | -1 <AND> -5 | x = -5 | 0.5
What is x in the equation? | -1 <AND> -5 | x = -5 or 5 | 0.5
What is x in the equation? | -1 <AND> -5 | x = -1 or x = -5 | 1.0
Can you explain this meme? | This meme is poking fun at the fact that the names of the countries
Iceland and Greenland are misleading. Despite its name, Iceland is known for its beautiful green landscapes,
while Greenland is mostly covered in ice and snow. The meme is saying that the person has trust issues
because the names of these countries do not accurately represent their landscapes. |
The meme talks about Iceland and Greenland. It's pointing out that despite their names,
Iceland is not very icy and Greenland isn't very green. | 0.4
Can you explain this meme? | This meme is poking fun at the fact that the names of the countries
Iceland and Greenland are misleading. Despite its name, Iceland is known for its beautiful green landscapes,
while Greenland is mostly covered in ice and snow. The meme is saying that the person has trust issues
because the names of these countries do not accurately represent their landscapes. |
The meme is using humor to point out the misleading nature of Iceland's and Greenland's names.
Iceland, despite its name, has lush green landscapes while Greenland is mostly covered in ice and snow.
The text 'This is why I have trust issues' is a playful way to suggest
that these contradictions can lead to distrust or confusion.
The humor in this meme is derived from the unexpected contrast between the names of the countries
and their actual physical characteristics. | 1.0
"""
gpt4_prompt = prompt + '\n' + ' | '.join(
[question, gt.replace('<AND>', ' <AND> ').replace('<OR>', ' <OR> '), prediction, ''])
return gpt4_prompt
def MMVet_auxeval(model, line):
def float_cvt(s):
try:
return float(s)
except ValueError:
return None
prompt = build_mmvet_gpt4_prompt(line)
log = ''
retry = 5
for i in range(retry):
output = model.generate(prompt, temperature=i * 0.5)
score = float_cvt(output)
if score is None:
log += f'Try {i}: output is {output}, failed to parse.\n'
elif score < 0 or score > 1:
log += f'Try {i}: output is {output}, invalid score: {score}.\n'
else:
log += 'Succeed'
return dict(log=log, score=score)
log += 'All 5 retries failed.\n'
return dict(log=log, score=0.0)
def MMVet_acc(result_file):
data = load(result_file)
tot = defaultdict(lambda: 0)
score = defaultdict(lambda: 0)
lt = len(data)
cate2_list = []
for i in range(lt):
item = data.iloc[i]
cate = item['category']
cate2 = cate.replace(',', '_')
if cate2 not in cate2_list:
cate2_list.append(cate2)
grade = float(item['score'])
cate_list = ['rec', 'ocr', 'know', 'gen', 'spat', 'math']
for capa in cate_list:
if capa in cate:
tot[capa] += 1
score[capa] += grade
tot['Overall'] += 1
tot[cate2] += 1
score['Overall'] += grade
score[cate2] += grade
res = defaultdict(list)
res2 = defaultdict(list)
cate_list.append('Overall')
cate2_list.append('Overall')
for k in cate_list:
res['Category'].append(k)
res['tot'].append(tot[k])
res['acc'].append(score[k] / tot[k] * 100)
for v in cate2_list:
res2['Category'].append(v)
res2['tot'].append(tot[v])
res2['acc'].append(score[v] / tot[v] * 100)
res = pd.DataFrame(res)
res2 = pd.DataFrame(res2)
return res, res2
def MMVet_eval(eval_file, **judge_kwargs):
logger = get_logger('Evaluation')
suffix = eval_file.split('.')[-1]
model = judge_kwargs['model']
storage = eval_file.replace(f'.{suffix}', f'_{model}.xlsx')
tmp_file = eval_file.replace(f'.{suffix}', f'_{model}.pkl')
nproc = judge_kwargs.pop('nproc', 4)
if osp.exists(storage):
logger.warning(f'GPT scoring file {storage} already exists, will reuse it in MMVet_eval. ')
else:
data = load(eval_file)
model = build_judge(max_tokens=3, **judge_kwargs)
lt = len(data)
lines = [data.iloc[i] for i in range(lt)]
tups = [(model, line) for line in lines]
indices = [line['index'] for line in lines]
ans = {}
if osp.exists(tmp_file):
ans = load(tmp_file)
tups = [x for x, i in zip(tups, indices) if i not in ans]
indices = [i for i in indices if i not in ans]
if len(indices):
new_results = track_progress_rich(
MMVet_auxeval, tups, nproc=nproc, chunksize=nproc,
keys=indices, save=tmp_file)
ans = load(tmp_file)
for k, v in zip(indices, new_results):
assert k in ans
assert ans[k]['log'] == v['log'] and ans[k]['score'] == v['score']
log_map, score_map = {}, {}
all_inds = [line['index'] for line in lines]
for k in all_inds:
log_map[k] = ans[k]['log']
score_map[k] = ans[k]['score']
data['score'] = [score_map[idx] for idx in data['index']]
data['log'] = [log_map[idx] for idx in data['index']]
dump(data, storage)
score, score_fine = MMVet_acc(storage)
score_pth = storage.replace('.xlsx', '_score.csv')
score_fine_pth = storage.replace('.xlsx', '_score_fine.csv')
dump(score, score_pth)
dump(score_fine, score_fine_pth)
logger.info(
f'MMVet_eval successfully finished evaluating {eval_file}, '
f'results saved in {score_pth} and {score_fine_pth}'
)
logger.info('Score: ')
logger.info(score)
def parse_args():
parser = argparse.ArgumentParser(description='Inference LLM Answers. ')
parser.add_argument('data', type=str, help='The question set for inference, in excel / tsv / json format. ')
parser.add_argument(
'--model',
type=str,
help='The LLM (GPT) used for inference. ',
default='gpt-4-turbo',
choices=['gpt-4-0613', 'gpt-4-turbo', 'chatgpt-1106', 'chatgpt-0613'])
parser.add_argument('--nproc', type=int, default=4)
parser.add_argument('--verbose', action='store_true')
args = parser.parse_args()
return args
if __name__ == '__main__':
load_env()
args = parse_args()
judge_kwargs = dict(model=args.model, nproc=args.nproc, verbose=args.verbose)
if 'OPENAI_API_KEY_JUDGE' in os.environ and os.environ['OPENAI_API_KEY_JUDGE']:
judge_kwargs['key'] = os.environ['OPENAI_API_KEY_JUDGE']
if 'OPENAI_API_BASE_JUDGE' in os.environ and os.environ['OPENAI_API_BASE_JUDGE']:
judge_kwargs['api_base'] = os.environ['OPENAI_API_BASE_JUDGE']
MMVet_eval(eval_file=args.data, **judge_kwargs)
This diff is collapsed.
# Copyright (c) OpenMMLab. All rights reserved.
# Partly adopted from https://github.com/GT-Vision-Lab/VQA
# Copyright (c) 2014, Aishwarya Agrawal
import re
from vlmeval.smp import *
from typing import Optional
from functools import partial
def _process_digit_article(inText):
outText = []
tempText = inText.lower().split()
articles = ['a', 'an', 'the']
manualMap = {
'none': '0',
'zero': '0',
'one': '1',
'two': '2',
'three': '3',
'four': '4',
'five': '5',
'six': '6',
'seven': '7',
'eight': '8',
'nine': '9',
'ten': '10',
}
contractions = {
'aint': "ain't",
'arent': "aren't",
'cant': "can't",
'couldve': "could've",
'couldnt': "couldn't",
"couldn'tve": "couldn't've",
"couldnt've": "couldn't've",
'didnt': "didn't",
'doesnt': "doesn't",
'dont': "don't",
'hadnt': "hadn't",
"hadnt've": "hadn't've",
"hadn'tve": "hadn't've",
'hasnt': "hasn't",
'havent': "haven't",
'hed': "he'd",
"hed've": "he'd've",
"he'dve": "he'd've",
'hes': "he's",
'howd': "how'd",
'howll': "how'll",
'hows': "how's",
"Id've": "I'd've",
"I'dve": "I'd've",
'Im': "I'm",
'Ive': "I've",
'isnt': "isn't",
'itd': "it'd",
"itd've": "it'd've",
"it'dve": "it'd've",
'itll': "it'll",
"let's": "let's",
'maam': "ma'am",
'mightnt': "mightn't",
"mightnt've": "mightn't've",
"mightn'tve": "mightn't've",
'mightve': "might've",
'mustnt': "mustn't",
'mustve': "must've",
'neednt': "needn't",
'notve': "not've",
'oclock': "o'clock",
'oughtnt': "oughtn't",
"ow's'at": "'ow's'at",
"'ows'at": "'ow's'at",
"'ow'sat": "'ow's'at",
'shant': "shan't",
"shed've": "she'd've",
"she'dve": "she'd've",
"she's": "she's",
'shouldve': "should've",
'shouldnt': "shouldn't",
"shouldnt've": "shouldn't've",
"shouldn'tve": "shouldn't've",
"somebody'd": 'somebodyd',
"somebodyd've": "somebody'd've",
"somebody'dve": "somebody'd've",
'somebodyll': "somebody'll",
'somebodys': "somebody's",
'someoned': "someone'd",
"someoned've": "someone'd've",
"someone'dve": "someone'd've",
'someonell': "someone'll",
'someones': "someone's",
'somethingd': "something'd",
"somethingd've": "something'd've",
"something'dve": "something'd've",
'somethingll': "something'll",
'thats': "that's",
'thered': "there'd",
"thered've": "there'd've",
"there'dve": "there'd've",
'therere': "there're",
'theres': "there's",
'theyd': "they'd",
"theyd've": "they'd've",
"they'dve": "they'd've",
'theyll': "they'll",
'theyre': "they're",
'theyve': "they've",
'twas': "'twas",
'wasnt': "wasn't",
"wed've": "we'd've",
"we'dve": "we'd've",
'weve': "we've",
'werent': "weren't",
'whatll': "what'll",
'whatre': "what're",
'whats': "what's",
'whatve': "what've",
'whens': "when's",
'whered': "where'd",
'wheres': "where's",
'whereve': "where've",
'whod': "who'd",
"whod've": "who'd've",
"who'dve": "who'd've",
'wholl': "who'll",
'whos': "who's",
'whove': "who've",
'whyll': "why'll",
'whyre': "why're",
'whys': "why's",
'wont': "won't",
'wouldve': "would've",
'wouldnt': "wouldn't",
"wouldnt've": "wouldn't've",
"wouldn'tve": "wouldn't've",
'yall': "y'all",
"yall'll": "y'all'll",
"y'allll": "y'all'll",
"yall'd've": "y'all'd've",
"y'alld've": "y'all'd've",
"y'all'dve": "y'all'd've",
'youd': "you'd",
"youd've": "you'd've",
"you'dve": "you'd've",
'youll': "you'll",
'youre': "you're",
'youve': "you've",
}
for word in tempText:
word = manualMap.setdefault(word, word)
if word not in articles:
outText.append(word)
for wordId, word in enumerate(outText):
if word in contractions:
outText[wordId] = contractions[word]
outText = ' '.join(outText)
return outText
def hit_calculate(result, dataset_name, anls_threshold=0.5):
if listinstr(['TextVQA'], dataset_name):
return [np.mean(x['match']) for x in result]
elif listinstr(['DocVQA', 'InfoVQA'], dataset_name):
# return [1 - np.min(x['match']) >= anls_threshold for x in result]
return [0.0 if 1 - np.min(x['match']) < anls_threshold else 1 - np.min(x['match']) for x in result]
elif listinstr(['ChartQA', 'OCRVQA'], dataset_name):
return [np.max(x['match']) for x in result]
else: # default using vqa_score to calculate score
return [np.mean(x['match']) for x in result]
# https://github.com/google-research/pix2struct/blob/main/pix2struct/metrics.py#L81
def relaxed_correctness(target: str,
prediction: str,
max_relative_change: float = 0.05) -> bool:
"""Calculates relaxed correctness.
The correctness tolerates certain error ratio defined by max_relative_change.
See https://arxiv.org/pdf/2203.10244.pdf, end of section 5.1:
“Following Methani et al. (2020), we use a relaxed accuracy measure for the
numeric answers to allow a minor inaccuracy that may result from the automatic
data extraction process. We consider an answer to be correct if it is within
5% of the gold answer. For non-numeric answers, we still need an exact match
to consider an answer to be correct.”
Args:
target: Target string.
prediction: Predicted string.
max_relative_change: Maximum relative change.
Returns:
Whether the prediction was correct given the specified tolerance.
"""
def _to_float(text: str) -> Optional[float]:
try:
if text.endswith('%'):
# Convert percentages to floats.
return float(text.rstrip('%')) / 100.0
else:
return float(text)
except ValueError:
return None
prediction = str(prediction)
target = str(target)
prediction_float = _to_float(prediction)
target_float = _to_float(target)
if prediction_float is not None and target_float:
relative_change = abs(prediction_float - target_float) / abs(target_float)
return relative_change <= max_relative_change
else:
return prediction.lower() == target.lower()
def levenshtein_distance(s1, s2):
if len(s1) > len(s2):
s1, s2 = s2, s1
distances = range(len(s1) + 1)
for i2, c2 in enumerate(s2):
distances_ = [i2 + 1]
for i1, c1 in enumerate(s1):
if c1 == c2:
distances_.append(distances[i1])
else:
distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1])))
distances = distances_
return distances[-1]
def anls_compute(groundtruth, prediction):
gt_answer = ' '.join(groundtruth.strip().lower().split())
det_answer = ' '.join(prediction.strip().lower().split())
dist = levenshtein_distance(gt_answer, det_answer)
length = max(len(groundtruth.upper()), len(prediction.upper()))
values = 0.0 if length == 0 else float(dist) / float(length)
return values
def process_answer(answer):
answer = answer.replace('\n', ' ')
answer = answer.replace('\t', ' ')
answer = answer.strip()
answer = process_punctuation(answer)
answer = _process_digit_article(answer)
return answer
def process_line(line, method='vqa_score'):
ret = {}
if istype(line['answer'], list):
answers = eval(line['answer'])
else:
answers = [line['answer']]
if method == 'vqa_score':
ret['gt'] = [process_answer(x) for x in answers]
ret['pred'] = process_answer(line['prediction'])
ret['match'] = []
for current_idx, gtAnsDatum in enumerate(ret['gt']):
otherGTAns = [
item for ret_gt_idx, item in enumerate(ret['gt'])
if ret_gt_idx != current_idx
]
matchingAns = [
item for item in otherGTAns if item == ret['pred']
]
acc = min(1, float(len(matchingAns)) / 3)
ret['match'].append(acc)
elif method == 'anls':
ret['gt'] = answers
ret['pred'] = line['prediction']
ret['match'] = [anls_compute(x, ret['pred']) for x in ret['gt']]
elif method == 'relaxed_accuracy':
ret['gt'] = answers
ret['pred'] = line['prediction'].strip()
ret['match'] = [relaxed_correctness(ret['pred'], x) for x in ret['gt']]
elif method == 'accuracy':
ret['gt'] = answers
ret['pred'] = line['prediction'].strip()
ret['match'] = [(1.0 if (x.strip().lower() == ret['pred'].strip().lower()) else 0.0) for x in ret['gt']]
else: # default using vqa_score to calculate score
ret['gt'] = [process_answer(x) for x in answers]
ret['pred'] = process_answer(line['prediction'])
ret['match'] = [x == ret['pred'] for x in ret['gt']]
return ret
def VQAEval(eval_file, dataset_name, **kwargs):
logger = get_logger('Evaluation')
data = load(eval_file)
assert 'answer' in data and 'prediction' in data
data['prediction'] = [str(x) for x in data['prediction']]
data['answer'] = [str(x) for x in data['answer']]
lt = len(data)
pool = mp.Pool(16)
lines = [data.iloc[i] for i in range(lt)]
if listinstr(['TextVQA'], dataset_name):
res = pool.map(partial(process_line, method='vqa_score'), lines)
elif listinstr(['ChartQA'], dataset_name):
res = pool.map(partial(process_line, method='relaxed_accuracy'), lines)
elif listinstr(['OCRVQA'], dataset_name):
res = pool.map(partial(process_line, method='accuracy'), lines)
elif listinstr(['DocVQA', 'InfoVQA'], dataset_name):
res = pool.map(partial(process_line, method='anls'), lines)
else: # default using vqa_score to calculate score
res = pool.map(process_line, lines)
# [np.mean(x['match']) >= full_score_weight for x in res]
hit = hit_calculate(res, dataset_name)
ret = dict()
if 'split' in data:
splits = set(data['split'])
for sp in splits:
sub = [r for l, r in zip(lines, res) if l['split'] == sp]
# [np.mean(x['match']) >= full_score_weight for x in sub]
hit = hit_calculate(sub, dataset_name)
ret[sp] = np.mean(hit) * 100
sub = [r for l, r in zip(lines, res)]
hit = hit_calculate(sub, dataset_name)
ret['Overall'] = np.mean(hit) * 100
else:
ret['Overall'] = np.mean(hit) * 100
if 'category' in data:
cates = list(set(data['category']))
cates.sort()
for c in cates:
sub = [r for l, r in zip(lines, res) if l['category'] == c]
# [np.mean(x['match']) >= full_score_weight for x in sub]
hit = hit_calculate(sub, dataset_name)
ret[c] = np.mean(hit) * 100
ret = d2df(ret)
ret.round(2)
suffix = eval_file.split('.')[-1]
result_file = eval_file.replace(f'.{suffix}', '_acc.csv')
logger.info(f'VQA Eval Finished. Saved to {result_file}. ')
logger.info(ret)
dump(ret, result_file)
This diff is collapsed.
This diff is collapsed.
from .file import *
from .vlm import *
from .misc import *
from .log import *
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment