"include/vscode:/vscode.git/clone" did not exist on "e85178b4ca892a78344271ae64103c9d4d1bfc40"
Commit 58d33d4c authored by wanglch's avatar wanglch
Browse files

Initial commit

parents
Pipeline #1904 canceled with stages
import json
import jsonlines
from docowl_infer import DocOwlInfer
from tqdm import tqdm
import os
from icecream import ic
from evaluation.benchmarks_eval import llm_text_localization_eval
import argparse
def read_jsonl(filename):
lines = []
with open(filename, 'r', encoding='utf-8') as f:
for line in jsonlines.Reader(f):
lines.append(line)
return lines
def save_jsonl(data, filename, print_log=True):
"""data is a list"""
with open(filename, "w") as f:
f.write("\n".join([json.dumps(e, ensure_ascii=False) for e in data]))
if print_log:
print('save %d samples to %s' % (len(data), filename))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='docowl1.5 doclocal4k evaluation')
parser.add_argument('--model_path', type=str, help='the directory path of model')
parser.add_argument('--task', type=str, choices=['text_grounding', 'text_recognition'])
parser.add_argument('--doclocal4k_dir', type=str, help='the directory path of DocLocal4K')
parser.add_argument('--save_dir', type=str, help='the directory to save predictions of the model')
args = parser.parse_args()
model_path = args.model_path
task = args.task
doclocal4k_dir = args.doclocal4k_dir
save_dir = args.save_dir
if not os.path.exists(save_dir):
os.makedirs(save_dir)
test_path = os.path.join(doclocal4k_dir, task+'.jsonl')
save_path = os.path.join(save_dir, task+'_test_pred.jsonl')
if os.path.exists(save_path):
print(save_path+' exists, skip inference. ')
else:
docowl = DocOwlInfer(ckpt_path=model_path, anchors='grid_9', add_global_img=False)
print('load model from ', model_path)
# infer the test samples one by one
test_samples = read_jsonl(test_path)
infer_results = []
for sample in tqdm(test_samples):
image =os.path.join(doclocal4k_dir, sample['image'][0])
assert os.path.exists(image)
question = sample['messages'][0]
answer = sample['messages'][1]
assert question['role'] == 'user'
assert answer['role'] == 'assistant'
query = question['content'].replace('<|image|>', '')
gt_answer = answer['content']
model_answer = docowl.inference(image, query)
sample['model_answer'] = model_answer
sample['gt_answer'] = gt_answer
ic(model_answer, gt_answer)
infer_results.append(sample)
save_jsonl(infer_results, save_path)
# calculate metrics
pred_path = save_path
if not os.path.exists(pred_path):
print('not exists:', pred_path)
exit(0)
if task == 'text_recognition':
llm_text_localization_eval(metric_names=['BLEU1', 'BLEU2', 'BLEU3', 'BLEU4'], result_path=pred_path, save_each_eval=True)
elif task == 'text_grounding':
llm_text_localization_eval(metric_names=['IOU@0.5'], result_path=pred_path, save_each_eval=True)
print('==============================================')
import torch
from PIL import Image
from transformers import TextStreamer
import os
from mplug_docowl.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN
from mplug_docowl.conversation import conv_templates, SeparatorStyle
from mplug_docowl.model.builder import load_pretrained_model
from mplug_docowl.mm_utils import process_images, tokenizer_image_token, get_model_name_from_path, KeywordsStoppingCriteria
from mplug_docowl.processor import DocProcessor
from icecream import ic
import time
class DocOwlInfer():
def __init__(self, ckpt_path, anchors='grid_9', add_global_img=True, load_8bit=False, load_4bit=False):
model_name = get_model_name_from_path(ckpt_path)
ic(model_name)
self.tokenizer, self.model, _, _ = load_pretrained_model(ckpt_path, None, model_name, load_8bit=load_8bit, load_4bit=load_4bit, device="cuda")
self.doc_image_processor = DocProcessor(image_size=448, anchors=anchors, add_global_img=add_global_img, add_textual_crop_indicator=True)
self.streamer = TextStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)
def inference(self, image, query):
image_tensor, patch_positions, text = self.doc_image_processor(images=image, query='<|image|>'+query)
image_tensor = image_tensor.to(self.model.device, dtype=torch.float16)
patch_positions = patch_positions.to(self.model.device)
# ic(image_tensor.shape, patch_positions.shape, text)
conv = conv_templates["mplug_owl2"].copy()
roles = conv.roles # ("USER", "ASSISTANT")
conv.append_message(conv.roles[0], text)
conv.append_message(conv.roles[1], None)
prompt = conv.get_prompt()
# ic(prompt)
input_ids = tokenizer_image_token(prompt, self.tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt').unsqueeze(0).to(self.model.device)
# ic(input_ids)
stop_str = conv.sep2
keywords = [stop_str]
stopping_criteria = KeywordsStoppingCriteria(keywords, self.tokenizer, input_ids)
with torch.inference_mode():
output_ids = self.model.generate(
input_ids,
images=image_tensor,
patch_positions=patch_positions,
do_sample=False,
temperature=1.0,
max_new_tokens=512,
streamer=self.streamer,
use_cache=True,
stopping_criteria=[stopping_criteria])
outputs = self.tokenizer.decode(output_ids[0, input_ids.shape[1]:]).strip()
return outputs.replace('</s>', '')
if __name__ == '__main__':
model_path = '/home/wanglch/mPLUG-DocOwl/DocOwl1.5-Omni-base/'
docowl = DocOwlInfer(ckpt_path=model_path, anchors='grid_9', add_global_img=True)
print('load model from ', model_path)
# exit(0)
qas = [
# docvqa case
{"image_path":"/home/wanglch/mPLUG-DocOwl/image/hp.jpg",
"question":"详细描述这张图片"},
{"image_path":"/home/wanglch/mPLUG-DocOwl/image/R-C.jpg",
"question":"详细描述这张图片"},
]
for qa in qas:
image= qa['image_path']
query = qa['question']
start_time = time.time()
## give relatively longer answer
answer = docowl.inference(image, query)
end_time = time.time()
cost_seconds = end_time-start_time
## answer with detailed explanation
# query = qa['question']+'Answer the question with detailed explanation.'
# answer = docowl.inference(image, query)
ic(image)
ic(query, answer)
ic(cost_seconds)
# ic(query_simple, answer_simple)
print('==================')
import jsonlines
import json
from icecream import ic
import re
from evaluator import doc_evaluate
import os
from tqdm import tqdm
import random
from pathlib import Path
def parser_line(line):
image = line['image'][0]
assert len(line['messages']) == 2
assert line['messages'][0]['role'] == 'user'
question = line['messages'][0]['content'].replace('<|image|>', '')
predicted_answer = line['model_answer'].replace('\n', '').strip()
gt_answer = line['gt_answer'].replace('\n', '').strip()
return image, question, predicted_answer, gt_answer
def parser_ground_line(line):
task_name = line['task_name'] # e.g. paragraph_bbox2t_sft
obj=task_name.split('_')[0]
image = line['image'][0]
assert 'messages' in line
assert len(line['messages']) == 2
assert line['messages'][0]['role'] == 'user'
question = line['messages'][0]['content'].replace('<|image|>', '')
task_name = line['task_name']
if 't2bbox' in task_name:
gt_answer = line['gt_answer'].strip().replace('<bbox>', '').replace('</bbox>','')
gt_answer = [max(min(int(x)/999, 1.0), 0.0) for x in gt_answer.split(',')]
model_answer = line['model_answer'].strip().replace('<bbox>', '').replace('</bbox>','')
try:
model_answer = [max(min(int(x)/999, 1.0), 0.0) for x in model_answer.split(',')]
except Exception as e:
model_answer = [0.0,0.0,0.0,0.0]
try:
assert len(model_answer) == 4
except AssertionError as e:
# ic(line)
model_answer = [0.0,0.0,0.0,0.0]
# exit(0)
else:
assert 'bbox2t' in task_name
model_answer = line['model_answer'].strip().replace('<ocr>', '').replace('</ocr>','')
model_answer = model_answer.strip()
gt_answer = line['gt_answer'].strip().replace('<ocr>', '').replace('</ocr>','')
gt_answer = gt_answer.strip()
return image, question, model_answer, gt_answer, obj
def save_jsonl(data, filename):
"""data is a list"""
with open(filename, "w") as f:
f.write("\n".join([json.dumps(e, ensure_ascii=False) for e in data]))
print('save %d samples to %s' % (len(data), filename))
def llm_benchmark_eval(metric_names=['ContainAccuracy'], result_path='', save_each_eval=True):
if not Path(result_path).exists():
ic('not exists',result_path)
return
ic(result_path)
gts = []
preds = []
imgs = []
ques = []
with open(result_path, 'r', encoding='utf-8') as f:
for line in jsonlines.Reader(f):
img, question, model_answer, gt_answer = parser_line(line)
if gt_answer.endswith('.'):
gt_answer = gt_answer[:-1]
imgs.append(img)
gts.append([gt_answer])
preds.append(model_answer)
ques.append(question)
ic(len(gts), len(preds))
metric2scores = {}
for metric_name in metric_names:
score, scores = doc_evaluate(metric=metric_name, targets=gts, predictions=preds)
ic(metric_name, score)
metric2scores[metric_name] = scores
if save_each_eval:
save_path = result_path.replace('.jsonl', '_metrics.jsonl')
eval_result = []
for i in range(len(imgs)):
# assert len(scores) == len(imgs)
eval_result.append({
'metric2score': [{'metric':metric, 'score': scores[i]} for metric, scores in metric2scores.items()],
'image':imgs[i],
'question': ques[i],
'gt': gts[i][0],
'pred': preds[i]})
save_jsonl(eval_result, save_path)
def llm_text_localization_eval(metric_names=['BLEU1', 'BLEU2', 'BLEU3', 'BLEU4'], result_path='', save_each_eval=True):
if not Path(result_path).exists():
ic('not exists',result_path)
return
ic(result_path)
gts = []
preds = []
imgs = []
ques = []
objs = []
with open(result_path, 'r', encoding='utf-8') as f:
for line in jsonlines.Reader(f):
img, question, model_answer, gt_answer, obj = parser_ground_line(line)
# model_answer = model_answer.strip()
if isinstance(gt_answer, str) and isinstance(model_answer, str):
if gt_answer.endswith('.'):
gt_answer = gt_answer[:-1]
imgs.append(img)
gts.append([gt_answer])
preds.append(model_answer)
ques.append(question)
objs.append(obj)
ic(len(gts), len(preds))
metric2scores = {}
metric2score = {}
for metric_name in metric_names:
score, scores = doc_evaluate(metric=metric_name, targets=gts, predictions=preds)
# ic(metric_name, score)
metric2scores[metric_name] = scores
metric2score[metric_name]=str(round(score,2))
# calculate metric of each type of object (word, phrase, line, paragraph)
obj2metrics = {}
for metric_name in metric_names:
scores = metric2scores[metric_name]
obj2scores = {}
for i, obj in enumerate(objs):
score = scores[i]
if obj not in obj2scores:
obj2scores[obj] = []
obj2scores[obj].append(score)
for obj, scores in obj2scores.items():
num=len(scores)
if metric_name == 'IOU@0.5':
score = round(100*sum(scores)/len(scores), 2)
else:
score = round(sum(scores)/len(scores), 2)
# ic(metric_name, obj, num, score)
if obj == 'word' and metric_name in ['BLEU2', 'BLEU3', 'BLEU4']:
continue
if obj == 'phrase' and metric_name in ['BLEU1', 'BLEU3', 'BLEU4']:
continue
if obj == 'line' and metric_name in ['BLEU1', 'BLEU2', 'BLEU4']:
continue
if obj == 'paragraph' and metric_name in ['BLEU1', 'BLEU2', 'BLEU3']:
continue
obj2metrics[obj+'_'+metric_name] = score
# print('---------------------------')
ic(obj2metrics)
if 'BLEU1' in metric_names: # recognition evaluation
ave = round(sum(obj2metrics.values())/len(obj2metrics.values()), 2)
ic(ave)
else: # grounding evaluation
ave = metric2score['IOU@0.5']
ic(ave)
if save_each_eval:
save_path = result_path.replace('.jsonl', '_metrics.jsonl')
eval_result = []
for i in range(len(imgs)):
# assert len(scores) == len(imgs)
eval_result.append({
'metric2score': [{'metric':metric, 'score': scores[i]} for metric, scores in metric2scores.items()],
'image':imgs[i],
'question': ques[i],
'gt': gts[i][0],
'pred': preds[i]})
save_jsonl(eval_result, save_path)
def llm_textcaps_textvqa_eval(result_path, dataset='TextVQA', split='test', meta_dir=''):
if dataset == 'TextVQA':
question_ids_path = os.path.join(meta_dir, dataset, split+'_q_ids.json')
if not os.path.exists(question_ids_path):
qa_path = os.path.join(meta_dir, dataset, 'TextVQA_0.5.1_'+split+'.json')
raw_qa_data = json.load(open(qa_path, 'r', encoding='utf-8'))
raw_qa_data = raw_qa_data['data']
# collect QAs of an identical image
print('collecting QAs......')
img2qas = {}
que_num = 0
for qa in tqdm(raw_qa_data):
if dataset == 'TextVQA':
imgid = qa['image_id']
question = qa['question']
q_id = qa['question_id']
if imgid not in img2qas:
img2qas[imgid] = {}
img2qas[imgid][question] = q_id
que_num+=1
ic(que_num)
json.dump(img2qas, open(question_ids_path, 'w', encoding='utf-8'))
print('save question ids to ', question_ids_path)
q_ids = json.load(open(question_ids_path, 'r', encoding='utf-8'))
llm_results = []
with open(result_path, 'r', encoding='utf-8') as f:
for line in jsonlines.Reader(f):
img = line['image'][0]
imgid = img.split('/')[-1].replace('.jpg', '')
assert line['messages'][0]['role'] == 'user'
question = line['messages'][0]['content'].replace('<|image|>', '')
if dataset == 'TextVQA':
q_id = q_ids[imgid][question]
# gt_answer = str(line['gt_answer']).replace('\n', '')
model_answer = str(line['model_answer'].strip()).replace('\n', '')
# ic(imgid, question, model_answer)
if model_answer.endswith('.'):
model_answer = model_answer[:-1]
llm_results.append({'question_id':q_id, 'answer':model_answer})
else:
llm_results = []
img2captions = {}
with open(result_path, 'r', encoding='utf-8') as f:
for line in jsonlines.Reader(f):
img = line['image'][0]
imgid = img.split('/')[-1].replace('.jpg', '')
model_answer = str(line['model_answer']).replace('\n', '')
# ic(imgid, model_answer)
if imgid not in img2captions:
img2captions[imgid] = []
img2captions[imgid].append(model_answer)
for imgid, captions in img2captions.items():
llm_results.append({'image_id':imgid, 'caption':random.choice(captions)})
ic(len(llm_results))
save_path = result_path.replace('.jsonl', '_official_eval.json')
json.dump(llm_results, open(save_path, 'w', encoding='utf-8'))
print('save LLM predictions in the official format to ', save_path)
if split == 'test':
print('!!!!!! upload this file to official website for evaluation !!!!!')
import json
from icecream import ic
import jsonlines
import copy
import random
import os
from due_evaluator.due_evaluator import DueEvaluator
def dataset2metrics(dataset_name):
if dataset_name in ['DocVQA', 'InfographicsVQA']:
return ['ANLS']
elif dataset_name in ['KleisterCharity', 'DeepForm']:
return ['F1']
elif dataset_name in ['TabFact']:
return ['F1']
elif dataset_name in ['PWC']:
return ['GROUP-ANLS']
elif dataset_name in ['WikiTableQuestions']:
return ['WTQ']
else:
print('unsupported dataset:', dataset_name)
def eval_due(dataset_name, pred_path, gt_path):
metrics = dataset2metrics(dataset_name)
preds = read_jsonl(pred_path)
gts = read_jsonl(gt_path)
print('pred %d, gt %d' % (len(preds), len(gts)))
for metric in metrics:
evaluator = DueEvaluator(reference=gts,
answers=preds,
ignore_case=True,
metric=metric)
general_scorer, label_scorers = evaluator._evalute()
ic('Overall %s:%.4f' % (metric, general_scorer.score()))
"""for label, scorer in label_scorers.items():
print('%s %s:%.4f' % (label, metric, scorer.score()))"""
def read_jsonl(path):
data = []
with open(path, 'r', encoding='utf-8') as f:
for line in jsonlines.Reader(f):
data.append(line)
return data
def save_jsonl(data, path):
with open(path,'w')as f:
for line in data:
f.write(json.dumps(line, ensure_ascii=False) +'\n')
print('save %d samples(imgs) to %s ' % (len(data), path))
def add_tabfact_missing_img(due_preds, meta_dir):
ref_path = meta_dir + 'TabFact/test/document.jsonl'
new_due_preds = []
i = -1
with open(ref_path, 'r', encoding='utf-8') as f:
for line in jsonlines.Reader(f):
i+=1
if due_preds[i]['name'] == line['name']:
"""# copy raw statement from anno file, avoid small revisions
img = {'name':line['name'], 'annotations':[]}
for i, anno in enumerate(line['annotations']):
pred_value = due_preds[i]['annotations']['values'][0]['value']
img['annotations'].append({'key':anno['key'], 'values':[{'value':pred_value}]})
new_due_preds.append(img)"""
new_due_preds.append(due_preds[i])
continue
else:
print('add random prediction for missing img:', line['name'])
img = {'name':line['name'], 'annotations':[]}
for anno in line['annotations']:
img['annotations'].append({'key':anno['key'], 'values':[{'value':random.choice(['0', '1'])}]})
new_due_preds.append(img)
i-=1
return new_due_preds
def llm_duebenchmark_eval(dataset_name, split, llm_pred_path, meta_dir):
"""
reformat results by LLM for due-benchmark evaluation
"""
assert dataset_name in ['DocVQA', 'InfographicsVQA', 'WikiTableQuestions', 'DeepForm', 'KleisterCharity', 'TabFact']
ic(dataset_name)
if dataset_name == 'DeepForm':
dataset_categories = ['advertiser', 'flight_from', 'flight_to', 'gross_amount', 'contract_num']
elif dataset_name == 'KleisterCharity':
dataset_categories = ['address__post_town',
'address__postcode',
'address__street_line',
'charity_name',
'charity_number',
'income_annually_in_british_pounds',
'report_date',
'spending_annually_in_british_pounds']
preds = []
with open(llm_pred_path, 'r', encoding='utf-8') as f:
for line in jsonlines.Reader(f):
assert len(line['messages']) == 2
assert line['messages'][0]['role'] == 'user'
question = line['messages'][0]['content'].replace('<|image|>', '')
preds.append({
'name':line['image'][0],
'question': question,
'answer':str(line['model_answer']).strip().replace('\n', '')})
meta_path = os.path.join(meta_dir, dataset_name, split, 'metadata.jsonl')
meta_data = read_jsonl(meta_path)
ic(len(meta_data), len(preds))
assert len(meta_data) == len(preds)
for i in range(len(meta_data)):
preds[i]['name'] = meta_data[i]['file_name'].split('/')[-1].split('.pdf')[0]
# for ie task, covert category question to the category
if dataset_name in ['DeepForm', 'KleisterCharity']:
cate_question = json.loads(meta_data[i]['ground_truth'])['gt_parses'][0]['question']
for cate in dataset_categories:
if cate in cate_question:
preds[i]['question'] = cate
break
# for qa task, copy question is necessary, question in preds can have some minor revisions
# keep quesiton consistent with gt file is necessary for due eveluation
else:
preds[i]['question'] = json.loads(meta_data[i]['ground_truth'])['gt_parses'][0]['question']
if dataset_name == 'TabFact':
if preds[i]['answer'].lower() == 'true':
preds[i]['answer'] = '1'
else:
assert preds[i]['answer'].lower() == 'false'
preds[i]['answer'] = '0'
# reorganize preds to 1 line means QA pairs or category-value pairs of 1 image
due_preds = []
img = {}
for i in range(len(preds)):
pred = preds[i]
if 'name' not in img: # start img
img['name'] = pred['name']
img['annotations'] = []
elif pred['name'] != img['name']: # save previous img results and init a new one
due_preds.append(copy.deepcopy(img))
img = {}
img['name'] = pred['name']
img['annotations'] = []
# for ie task, if the answer is none, drop the category-value pair
if dataset_name not in ['DeepForm', 'KleisterCharity'] or pred['answer'] != 'None':
img['annotations'].append({'key':pred['question'], 'values':[{'value':pred['answer']}]})
if i == len(preds)-1:
due_preds.append(copy.deepcopy(img))
if dataset_name == 'TabFact':
due_preds = add_tabfact_missing_img(due_preds, meta_dir)
save_path = llm_pred_path.replace('.jsonl', '_due.jsonl')
save_jsonl(due_preds, save_path)
gt_path = os.path.join(meta_dir, dataset_name, split, 'document.jsonl')
eval_due(dataset_name, save_path, gt_path)
from .__main__ import cli_main
from .due_evaluator import DueEvaluator
__all__ = ['DueEvaluator', 'cli_main']
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import sys
from typing import Optional, Set
import json
from due_evaluator.due_evaluator import DueEvaluator
from due_evaluator.utils import property_scores_to_string
def parse_args():
"""Parse CLI arguments.
Returns:
namespace: namespace with parsed variables.
"""
parser = argparse.ArgumentParser('Document Understanding Evaluator')
parser.add_argument(
'--out-files',
'-o',
type=argparse.FileType('r', encoding='utf-8'),
required=True,
nargs='+',
help='Out file to evaluate',
)
parser.add_argument(
'--reference', '-r', type=argparse.FileType('r', encoding='utf-8'), required=True, help='Reference file',
)
parser.add_argument('--metric', '-m', type=str, default='F1', choices=['F1', 'MEAN-F1', 'ANLS', 'WTQ', 'GROUP-ANLS'])
parser.add_argument(
'--return-score',
default='F1',
choices=['F1', 'mean-F1', 'ANLS', 'mean-Precision', 'mean-Recall', 'WTQ'],
help='Return WR-like mean-F1 score',
)
parser.add_argument('--line-by-line', action='store_true', default=False, help='Return retults example-based')
parser.add_argument(
'--columns', type=str, nargs='+', default=['Precision', 'Recall', 'F1'], help='Columns',
)
parser.add_argument(
'--print-format',
default='text',
type=str,
choices=['text', 'latex', 'json'],
help='Print feature table in the given format',
)
parser.add_argument('--properties', nargs='+', type=str, help='Property set to be limitted to')
parser.add_argument(
'--ignore-case', '-i', action='store_true', default=False, help='Property set to be limitted to',
)
return parser.parse_args()
def cli_main(args: argparse.Namespace):
"""CLI main.
Args:
args: cli arguments
"""
reference = [json.loads(line) for line in args.reference]
evaluators = []
for out_file in args.out_files:
predictions = [json.loads(line) for line in out_file]
property_set: Optional[Set[str]]
if args.properties:
property_set = args.properties
else:
property_set = None
evaluators.append(
DueEvaluator(reference, predictions, property_set, args.ignore_case, out_file.name, args.metric)
)
prop_str = property_scores_to_string(evaluators, args.print_format, args.columns)
if args.print_format != 'json':
print(prop_str, file=sys.stderr)
if args.line_by_line:
for idx, score in enumerate(evaluators[0].line_by_line()):
print(f'{idx}: {score}', file=sys.stderr)
return prop_str
def main() -> None:
"""Main."""
args = parse_args()
cli_main(args)
if __name__ == '__main__':
main()
"""Version specification."""
VERSION = (0, 0, 8)
__version__ = '.'.join(map(str, VERSION))
import sys
from collections import defaultdict
from typing import Callable, DefaultDict, List, Optional, Set, Tuple, TypeVar, Union, Generic
from copy import deepcopy
from due_evaluator.scorers import AnlsScorer, BaseScorer, FScorer, MeanFScorer, WtqScorer, GevalScorer, GroupAnlsScorer
TScorer = TypeVar("TScorer", bound=BaseScorer)
class DueEvaluator:
"""Due Evaluator."""
def __init__(
self,
reference: List[List[dict]],
answers: List[List[dict]],
property_set: Optional[Set[str]] = None,
ignore_case: bool = False,
path: Optional[str] = None,
metric: Optional[str] = 'F1',
):
"""Initialize DueEvaluator.
Arguments:
reference: reference
answers: answers to be evaluated
separator: property name and property value separator
property_set: if given, the score will be computed taking into account only these properties.
ignore_case: if true, compute scores ignoring casing.
path: Optional, the path to the evaluated files.
"""
self.reference = reference
self.answers = answers
self.property_set = property_set
self.ignore_case = ignore_case
self.metric = metric
self.__path = path
self.__general_scorer, self.__property_scorers = self._evalute()
@property
def general_scorer(self) -> BaseScorer:
"""Get general scorer.
Returns:
FScorer: the general scorer.
"""
return self.__general_scorer
@property
def property_scorers(self) -> DefaultDict[str, BaseScorer]:
"""Get a scorer for each property.
Returns:
Fscorer: the general scorer.
"""
return self.__property_scorers
@property
def path(self) -> Optional[str]:
"""Return the path of the evaluated file or None--in case when not ealuating a file.
Returns:
Optional[str]: the path of the evaluated file or None.
"""
return self.__path
def create_scorer(self) -> BaseScorer:
scorer: BaseScorer
if self.metric == 'F1':
scorer = FScorer()
elif self.metric == 'ANLS':
scorer = AnlsScorer()
elif self.metric == 'MEAN-F1':
scorer = MeanFScorer()
elif self.metric == 'WTQ':
scorer = WtqScorer()
elif self.metric == 'GROUP-ANLS':
scorer = GroupAnlsScorer()
elif self.metric == 'GEVAL':
scorer = GevalScorer()
else:
raise ValueError(self.metric)
return scorer
def filter_properties(self, doc: dict, values: Union[str, List[str], Set[str]]) -> List[str]:
"""Filter the list of properties by provided property name(s).
Args:
doc: document with annotations
values: a property name(s)
Returns:
doc: with filtered annotations
"""
if isinstance(values, str):
values = [values]
doc_copy = deepcopy(doc)
doc_copy['annotations'] = [a for a in doc_copy['annotations'] if a['key'] in values]
return doc_copy
def _evalute(self) -> Tuple[BaseScorer, DefaultDict[str, BaseScorer]]:
"""Evaluate the output file.
Returns:
tuple: general fscorer and a dict with fscorer per label.
"""
label_scorers: DefaultDict[str, BaseScorer] = defaultdict(self.create_scorer)
general_scorer = self.create_scorer()
reference_labels: Set[str] = set()
for ans_items, ref_items in zip(self.answers, self.reference):
if self.ignore_case:
ans_items = self.uppercase_items(ans_items)
ref_items = self.uppercase_items(ref_items)
if general_scorer.support_feature_scores():
reference_labels |= set(a['key'] for a in ref_items['annotations'])
for label in set(item['key'] for item in ref_items['annotations'] + ans_items['annotations']):
if self.property_set and label not in self.property_set:
continue
label_out = self.filter_properties(ans_items, label)
label_ref = self.filter_properties(ref_items, label)
label_scorers[label].add(label_out, label_ref)
if general_scorer.support_feature_scores() and self.property_set:
ans_items = self.filter_properties(ans_items, self.property_set)
ref_items = self.filter_properties(ref_items, self.property_set)
general_scorer.add(ans_items, ref_items)
for label in list(label_scorers.keys()):
if label not in reference_labels:
del label_scorers[label]
return general_scorer, label_scorers
def uppercase_items(self, document: dict) -> List[str]:
"""Upperecase annotation values.
Args:
document: document with annotations that should be uppercased.
Returns:
document: with with uppercased annotations.
"""
for item in document['annotations']:
for value_dict in item['values']:
if 'value' in value_dict:
value_dict['value'] = value_dict['value'].upper()
if 'value_variants' in value_dict:
value_dict['value_variants'] = [variant.upper() for variant in value_dict['value_variants']]
if 'children' in value_dict:
value_dict['children'] = self.uppercase_items({'annotations': value_dict['children']})['annotations']
return document
def line_by_line(self):
"""Compute scores line by line.
Returns:
List: list with scorers.
"""
scores = []
for ans_items, ref_items in zip(self.answers, self.reference):
fscorer = self.create_scorer()
if self.ignore_case:
ans_items = self.uppercase_items(ans_items)
ref_items = self.uppercase_items(ref_items)
fscorer.add(ans_items, ref_items)
scores.append(fscorer.score())
return scores
from .anls_scorer import AnlsScorer
from .base_scorer import BaseScorer
from .fscorer import FScorer
from .mean_fscorer import MeanFScorer
from .wtq_scorer import WtqScorer
from .group_anls import GroupAnlsScorer
from .geval_scorer import GevalScorer
__all__ = ['AnlsScorer', 'BaseScorer', 'FScorer', 'MeanFScorer', 'WtqScorer', 'GevalScorer', 'GroupAnlsScorer']
import logging
from typing import List
from operator import itemgetter
from .base_scorer import BaseScorer
logger = logging.getLogger(__name__)
class AccuracyScorer(BaseScorer):
"""Accuracy Scorer."""
def __init__(self, threshold: float = 0.5):
self.__scores: List[float] = []
self.threshold = threshold
@property
def scores(self):
return self.__scores
def check_denotation(self, out: list, ref: list) -> bool:
return out == ref
def add(self, out_items: List[dict], ref_items: List[dict]):
"""Add more items for computing corpus level scores.
Args:
out_items: outs from a single document (line)
ref_items: reference of the evaluated document (line)
"""
out_ann = sorted(out_items['annotations'], key=itemgetter('key'))
ref_ann = sorted(ref_items['annotations'], key=itemgetter('key'))
assert [a['key'] for a in out_ann] == [a['key'] for a in ref_ann]
for out, ref in zip(out_ann, ref_ann):
o_values = [v['value'] for v in out['values']]
r_values = [v['value'] for v in ref['values']]
score = int(self.check_denotation(o_values, r_values))
self.__scores.append(score)
def score(self) -> float:
if self.__scores:
return sum(self.__scores) / len(self.__scores)
return 0.0
@classmethod
def support_feature_scores(cls) -> bool:
return False
@classmethod
def metric_name(cls) -> str:
return "Accuracy"
import logging
from typing import List
from operator import itemgetter
import textdistance
from due_evaluator.scorers.base_scorer import BaseScorer
logger = logging.getLogger(__name__)
class AnlsScorer(BaseScorer):
"""ANSL Scorer."""
def __init__(self, threshold: float = 0.5):
self.__scores: List[float] = []
self.threshold = threshold
@property
def scores(self):
return self.__scores
def add(self, out_items: List[dict], ref_items: List[dict]):
"""Add more items for computing corpus level scores.
Args:
out_items: outs from a single document (line)
ref_items: reference of the evaluated document (line)
"""
out_ann = sorted(out_items['annotations'], key=itemgetter('key'))
ref_ann = sorted(ref_items['annotations'], key=itemgetter('key'))
assert [a['key'][:100] for a in out_ann] == [a['key'][:100] for a in ref_ann]
"""try:
# assert [a['key'][:100] for a in out_ann] == [a['key'][:100] for a in ref_ann]
out_keys = [a['key'][:100] for a in out_ann]
ref_keys = [a['key'][:100] for a in ref_ann]
# assert out_keys == ref_keys
for i in range(len(out_keys)):
try:
assert out_keys[i] == ref_keys[i]
except AssertionError as e:
print(out_keys[i])
print(ref_keys[i])
print('==============')
# exit(0)
except AssertionError as e:
print('key of pred and gt unmatched:')
# print('pred:', out_keys)
# print('gt:', ref_keys)
exit(0)"""
for out, ref in zip(out_ann, ref_ann):
assert len(out['values']) == 1
val = out['values'][0]['value']
possible_vals = ref['values'][0]['value_variants']
best_score = max([textdistance.levenshtein.normalized_similarity(val, pos)
for pos in possible_vals])
if 1 - self.threshold >= best_score:
best_score = 0.0
self.__scores.append(best_score)
def score(self) -> float:
if self.__scores:
return sum(self.__scores) / len(self.__scores)
return 0.0
@classmethod
def support_feature_scores(cls) -> bool:
return False
@classmethod
def metric_name(cls) -> str:
return "ANLS"
import abc
from typing import List
class BaseScorer(abc.ABC):
"""Abstract class for scorers."""
@abc.abstractmethod
def add(self, out_items: List[dict], ref_items: List[dict]):
pass
@abc.abstractmethod
def score(self):
pass
@abc.abstractclassmethod
def support_feature_scores(cls) -> bool:
pass
@abc.abstractclassmethod
def metric_name(cls) -> str:
pass
# -*- coding: utf-8 -*-
"""F1 Scorer."""
from dataclasses import dataclass, field
from typing import Any, Dict, List
from due_evaluator.scorers.base_scorer import BaseScorer
@dataclass(eq=False, frozen=True)
class Annotation:
key: str
value: str
value_variants: List[str] = field(default_factory=list)
def __eq__(self, other):
if self.key == other.key:
if self.value == other.value:
return True
elif self.value in other.value_variants:
return True
elif other.value in self.value_variants:
return True
return False
class FScorer(BaseScorer):
"""Corpus level F1 Score evaluator."""
def __init__(self):
"""Initialize class."""
self.__precision = []
self.__recall = []
@classmethod
def from_scorers(cls, scorers: List['FScorer']) -> 'FScorer':
"""Get new scorers that is the ensamble of the scorers.
Args:
scorers: list of scorers
Returns:
FScorer: a new FScorer
"""
new_scorer = cls()
for scorer in scorers:
new_scorer.__precision.extend(scorer.__precision)
new_scorer.__recall.extend(scorer.__recall)
return new_scorer
def flatten_annotations(self, annotations: List[Dict[str, Any]]) -> List[Annotation]:
flatten_items = []
for annotation in annotations:
for value in annotation['values']:
flatten_items.append(Annotation(
key=annotation['key'],
value=value['value'],
value_variants=value['value_variants'] if 'value_variants' in value else []))
return flatten_items
def add(self, out_items: Dict[str, Any], ref_items: Dict[str, Any]):
"""Add more items for computing corpus level scores.
Args:
out_items: outs from a single document (line)
ref_items: reference of the evaluated document (line)
"""
prediction_annotations = self.flatten_annotations(out_items['annotations'])
ref_annotations = self.flatten_annotations(ref_items['annotations'])
ref_annotations_copy = ref_annotations.copy()
indicators = []
for prediction in prediction_annotations:
if prediction in ref_annotations_copy:
indicators.append(1)
ref_annotations_copy.remove(prediction)
else:
indicators.append(0)
self.__add_to_precision(indicators)
indicators = []
prediction_annotations_copy = prediction_annotations.copy()
for ref in ref_annotations:
if ref in prediction_annotations_copy:
indicators.append(1)
prediction_annotations_copy.remove(ref)
else:
indicators.append(0)
self.__add_to_recall(indicators)
def __add_to_precision(self, item: List[int]):
if isinstance(item, list):
self.__precision.extend(item)
else:
self.__precision.append(item)
def __add_to_recall(self, item: List[int]):
if isinstance(item, list):
self.__recall.extend(item)
else:
self.__recall.append(item)
def precision(self) -> float:
"""Compute precision.
Returns:
float: corpus level precision
"""
if self.__precision:
precision = sum(self.__precision) / len(self.__precision)
else:
precision = 0.0
return precision
@property
def precision_support(self):
return self.__precision
@property
def recall_support(self):
return self.__recall
def recall(self) -> float:
"""Compute recall.
Returns:
float: corpus level recall
"""
if self.__recall:
recall = sum(self.__recall) / len(self.__recall)
else:
recall = 0.0
return recall
def f_score(self) -> float:
"""Compute F1 score.
Returns:
float: corpus level F1 score.
"""
precision = self.precision()
recall = self.recall()
if precision or recall:
fscore = 2 * precision * recall / (precision + recall)
else:
fscore = 0.0
return fscore
def false_negative(self) -> int:
"""Return the number of false negatives.
Returns:
int: number of false negatives.
"""
return len(self.__recall) - sum(self.__recall)
def false_positive(self) -> int:
"""Return the number of false positives.
Returns:
int: number of false positives.
"""
return len(self.__precision) - sum(self.__precision)
def true_positive(self) -> int:
"""Return number of true positives.
Returns:
int: number of true positives.
"""
return sum(self.__precision)
def condition_positive(self) -> int:
"""Return number of condition positives.
Returns:
int: number of condition positives.
"""
return len(self.__precision)
def score(self):
return self.f_score()
@classmethod
def support_feature_scores(cls) -> bool:
return True
@classmethod
def metric_name(cls) -> str:
return "F1"
from typing import List
import tempfile
from collections import defaultdict
import os
from due_evaluator.scorers.fscorer import FScorer
from due_evaluator.scorers.base_scorer import BaseScorer
GEVAL_BINARY = os.getenv('GEVAL_BINARY', '/data/shared/bin/geval')
GEVAL_METRIC = os.getenv('GEVAL_METRIC', 'MultiLabel-F1:cN')
class GevalScorer(BaseScorer):
def __init__(self):
self.__ref = tempfile.NamedTemporaryFile('w+t')
self.__out = tempfile.NamedTemporaryFile('w+t')
self.__ref_data = defaultdict(set)
self.__out_data = defaultdict(set)
@staticmethod
def add_to_geval_data(data, line):
name = line['name']
for annotation in line['annotations']:
for idx, val in enumerate(annotation['values'], 1):
for child in val['children']:
new_name = child['key'] + '__' + str(idx) if '__' in child['key'] else child['key']
if child['values'] and child['values'] != ['']:
new_value = '|'.join([v['value'].replace(' ', '_') for v in child['values']])
data[name].add(f'{new_name}={new_value}')
def save_geval_files(self):
for name in sorted(self.__ref_data.keys()):
self.__ref.write(' '.join(self.__ref_data[name]) + '\n')
self.__out.write(' '.join(self.__out_data[name]) + '\n')
def add(self, out_items: List[str], ref_items: List[str]):
self.add_to_geval_data(self.__out_data, out_items)
self.add_to_geval_data(self.__ref_data, ref_items)
def support_feature_scores(cls) -> bool:
return False
def metric_name(cls) -> str:
return "GEVAL"
def run_geval(self):
self.__ref.flush()
self.__out.flush()
try:
return float(os.popen(f'{GEVAL_BINARY} -o {self.__out.name} -e {self.__ref.name} --metric {GEVAL_METRIC}').read())
except:
return -1
def score(self) -> float:
self.save_geval_files()
return self.run_geval()
from typing import Any, List, Dict
import itertools
from dataclasses import dataclass, field
import numpy as np
from scipy.optimize import linear_sum_assignment
import textdistance
from due_evaluator.scorers.fscorer import FScorer
from due_evaluator.scorers.base_scorer import BaseScorer
@dataclass(eq=False, frozen=True)
class FuzzyAnnotation:
key: str
value: str
value_variants: List[str] = field(default_factory=list)
def __eq__(self, other):
def _is_float(val):
try:
float(val)
except ValueError:
return False
return True
def _comp(val, pos) -> float:
if _is_float(val) or _is_float(pos):
return float(val == pos)
return textdistance.levenshtein.normalized_similarity(val, pos)
def _is_acceptable(val, possible_vals, threshold=.5):
best_score = max([_comp(val, pos) for pos in possible_vals] + [0.])
return best_score >= threshold
if self.key == other.key:
if _is_acceptable(other.value, [self.value]):
return True
elif _is_acceptable(self.value, other.value_variants):
return True
elif _is_acceptable(other.value, self.value_variants):
return True
return False
class FuzzyFScorer(FScorer):
def flatten_annotations(self, annotations: List[Dict[str, Any]]) -> List[FuzzyAnnotation]:
flatten_items = []
for annotation in annotations:
for value in annotation['values']:
flatten_items.append(FuzzyAnnotation(
key=annotation['key'],
value=value['value'],
value_variants=value['value_variants'] if 'value_variants' in value else []))
return flatten_items
class GroupAnlsScorer(BaseScorer):
def __init__(self):
self.__inner_scorer = FuzzyFScorer()
def pseudo_documents(self, doc: dict) -> List[dict]:
docs = []
for ann in doc['annotations']:
for val in ann['values']:
assert 'children' in val
docs.append({
'name': '',
'annotations': val['children']
})
return docs
def best_permutation(self, out_items: List[dict], ref_items: List[dict]):
out_items = self.pseudo_documents(out_items)
ref_items = self.pseudo_documents(ref_items)
target_length = max(len(out_items), len(ref_items))
out_items = self.pad(out_items, target_length)
ref_items = self.pad(ref_items, target_length)
matrix = []
for o in out_items:
row = []
for ri, r in enumerate(ref_items):
fscorer = FuzzyFScorer()
fscorer.add(o, r)
row.append(1 - fscorer.f_score())
matrix.append(row)
row_ind, col_ind = linear_sum_assignment(np.array(matrix))
best_out = [out_items[i] for i in row_ind]
best_ref = [ref_items[i] for i in col_ind]
return (best_out, best_ref)
def pad(self, items: List[dict], target_length: int):
for _ in range(target_length - len(items)):
items.append({'name': '', 'annotations': []})
return items
def add(self, out_items: List[str], ref_items: List[str]):
if len(self.pseudo_documents(out_items)) == 0 and len(self.pseudo_documents(ref_items)) == 0:
return
out_perm, ref_perm = self.best_permutation(out_items, ref_items)
for o, r in zip(out_perm, ref_perm):
self.__inner_scorer.add(o, r)
def support_feature_scores(cls) -> bool:
return False
def metric_name(cls) -> str:
return "GROUP-ANLS"
def score(self) -> float:
return self.__inner_scorer.score()
from typing import List
from due_evaluator.scorers.fscorer import FScorer
from due_evaluator.scorers.base_scorer import BaseScorer
class MeanFScorer(BaseScorer):
def __init__(self):
self.__scores: List[float] = []
def add(self, out_items: List[str], ref_items: List[str]):
fscorer = FScorer()
fscorer.add(out_items, ref_items)
self.__scores.append(fscorer.f_score())
def support_feature_scores(cls) -> bool:
return False
def metric_name(cls) -> str:
return "MEAN-F1"
def score(self) -> float:
if self.__scores:
return sum(self.__scores) / len(self.__scores)
return 0.0
"""
Based on the official implementation from:
https://github.com/ppasupat/WikiTableQuestions/blob/master/evaluator.py
"""
import logging
from typing import List
from operator import itemgetter
import re
from math import isnan, isinf
from abc import ABCMeta, abstractmethod
import unicodedata
from due_evaluator.scorers.accuracy_scorer import AccuracyScorer
logger = logging.getLogger(__name__)
def normalize(x):
# Remove diacritics
x = ''.join(c for c in unicodedata.normalize('NFKD', x)
if unicodedata.category(c) != 'Mn')
# Normalize quotes and dashes
x = re.sub(r"[‘’´`]", "'", x)
x = re.sub(r"[“”]", "\"", x)
x = re.sub(r"[‐‑‒–—−]", "-", x)
while True:
old_x = x
# Remove citations
x = re.sub(r"((?<!^)\[[^\]]*\]|\[\d+\]|[•♦†‡*#+])*$", "", x.strip())
# Remove details in parenthesis
x = re.sub(r"(?<!^)( \([^)]*\))*$", "", x.strip())
# Remove outermost quotation mark
x = re.sub(r'^"([^"]*)"$', r'\1', x.strip())
if x == old_x:
break
# Remove final '.'
if x and x[-1] == '.':
x = x[:-1]
# Collapse whitespaces and convert to lower case
x = re.sub(r'\s+', ' ', x, flags=re.U).lower().strip()
return x
class Value(object):
__metaclass__ = ABCMeta
# Should be populated with the normalized string
_normalized = None
@abstractmethod
def match(self, other):
"""Return True if the value matches the other value.
Args:
other (Value)
Returns:
a boolean
"""
pass
@property
def normalized(self):
return self._normalized
class StringValue(Value):
def __init__(self, content):
assert isinstance(content, str)
self._normalized = normalize(content)
self._hash = hash(self._normalized)
def __eq__(self, other):
return isinstance(other, StringValue) and self.normalized == other.normalized
def __hash__(self):
return self._hash
def __str__(self):
return 'S' + str([self.normalized])
__repr__ = __str__
def match(self, other):
assert isinstance(other, Value)
return self.normalized == other.normalized
class NumberValue(Value):
def __init__(self, amount, original_string=None):
assert isinstance(amount, (int, float))
if abs(amount - round(amount)) < 1e-6:
self._amount = int(amount)
else:
self._amount = float(amount)
if not original_string:
self._normalized = unicode(self._amount)
else:
self._normalized = normalize(original_string)
self._hash = hash(self._amount)
@property
def amount(self):
return self._amount
def __eq__(self, other):
return isinstance(other, NumberValue) and self.amount == other.amount
def __hash__(self):
return self._hash
def __str__(self):
return ('N(%f)' % self.amount) + str([self.normalized])
__repr__ = __str__
def match(self, other):
assert isinstance(other, Value)
if self.normalized == other.normalized:
return True
if isinstance(other, NumberValue):
return abs(self.amount - other.amount) < 1e-6
return False
@staticmethod
def parse(text):
"""Try to parse into a number.
Return:
the number (int or float) if successful; otherwise None.
"""
try:
return int(text)
except:
try:
amount = float(text)
assert not isnan(amount) and not isinf(amount)
return amount
except:
return None
class DateValue(Value):
def __init__(self, year, month, day, original_string=None):
"""Create a new DateValue. Placeholders are marked as -1."""
assert isinstance(year, int)
assert isinstance(month, int) and (month == -1 or 1 <= month <= 12)
assert isinstance(day, int) and (day == -1 or 1 <= day <= 31)
assert not (year == month == day == -1)
self._year = year
self._month = month
self._day = day
if not original_string:
self._normalized = '{}-{}-{}'.format(
year if year != -1 else 'xx',
month if month != -1 else 'xx',
day if day != '-1' else 'xx')
else:
self._normalized = normalize(original_string)
self._hash = hash((self._year, self._month, self._day))
@property
def ymd(self):
return (self._year, self._month, self._day)
def __eq__(self, other):
return isinstance(other, DateValue) and self.ymd == other.ymd
def __hash__(self):
return self._hash
def __str__(self):
return (('D(%d,%d,%d)' % (self._year, self._month, self._day))
+ str([self._normalized]))
__repr__ = __str__
def match(self, other):
assert isinstance(other, Value)
if self.normalized == other.normalized:
return True
if isinstance(other, DateValue):
return self.ymd == other.ymd
return False
@staticmethod
def parse(text):
"""Try to parse into a date.
Return:
tuple (year, month, date) if successful; otherwise None.
"""
try:
ymd = text.lower().split('-')
assert len(ymd) == 3
year = -1 if ymd[0] in ('xx', 'xxxx') else int(ymd[0])
month = -1 if ymd[1] == 'xx' else int(ymd[1])
day = -1 if ymd[2] == 'xx' else int(ymd[2])
assert not (year == month == day == -1)
assert month == -1 or 1 <= month <= 12
assert day == -1 or 1 <= day <= 31
return (year, month, day)
except:
return None
class WtqScorer(AccuracyScorer):
"""WTQ Scorer."""
def __init__(self, threshold: float = 0.5):
self.__scores: List[float] = []
self.threshold = threshold
@property
def scores(self):
return self.__scores
def to_value(self, original_string, corenlp_value=None):
"""Convert the string to Value object.
Args:
original_string (str): Original string
corenlp_value (str): Optional value returned from CoreNLP
Returns:
Value
"""
if isinstance(original_string, Value):
# Already a Value
return original_string
if not corenlp_value:
corenlp_value = original_string
# Number?
amount = NumberValue.parse(corenlp_value)
if amount is not None:
return NumberValue(amount, original_string)
# Date?
ymd = DateValue.parse(corenlp_value)
if ymd is not None:
if ymd[1] == ymd[2] == -1:
return NumberValue(ymd[0], original_string)
else:
return DateValue(ymd[0], ymd[1], ymd[2], original_string)
# String.
return StringValue(original_string)
def to_value_list(self, original_strings, corenlp_values=None):
"""Convert a list of strings to a list of Values
Args:
original_strings (list[str])
corenlp_values (list[str or None])
Returns:
list[Value]
"""
assert isinstance(original_strings, (list, tuple, set))
if corenlp_values is not None:
assert isinstance(corenlp_values, (list, tuple, set))
assert len(original_strings) == len(corenlp_values)
return list(set(to_value(x, y) for (x, y)
in zip(original_strings, corenlp_values)))
else:
return list(set(self.to_value(x) for x in original_strings))
def check_denotation(self, predicted_values: list, target_values: list):
"""Return True if the predicted denotation is correct.
Args:
predicted_values (list[Value])
target_values (list[Value])
Returns:
bool
"""
target_values = self.to_value_list(target_values)
predicted_values = self.to_value_list(predicted_values)
# Check size
if len(target_values) != len(predicted_values):
return False
# Check items
for target in target_values:
if not any(target.match(pred) for pred in predicted_values):
return False
return True
def add(self, out_items: List[dict], ref_items: List[dict]):
"""Add more items for computing corpus level scores.
Args:
out_items: outs from a single document (line)
ref_items: reference of the evaluated document (line)
"""
out_ann = sorted(out_items['annotations'], key=itemgetter('key'))
ref_ann = sorted(ref_items['annotations'], key=itemgetter('key'))
assert [a['key'][:100] for a in out_ann] == [a['key'][:100] for a in ref_ann]
for out, ref in zip(out_ann, ref_ann):
o_values = [v['value'] for v in out['values']]
r_values = [v['value'] for v in ref['values']]
score = int(self.check_denotation(o_values, r_values))
self.__scores.append(score)
def score(self) -> float:
if self.__scores:
return sum(self.__scores) / len(self.__scores)
return 0.0
@classmethod
def support_feature_scores(cls) -> bool:
return False
@classmethod
def metric_name(cls) -> str:
return "WTQ"
from due_evaluator.scorers.fscorer import FScorer
from typing import Dict, List, Optional, Sequence, Union
import pandas as pd
from due_evaluator.due_evaluator import DueEvaluator
def dataframe_to_print(df: pd.DataFrame, print_format: Optional[str] = 'text') -> str:
"""Export dataframe to json or plain text.
Args:
df (pd.DataFrame): data
print_format (str, optional): Print format. Defaults to 'text'.
Raises:
ValueError: unknown print_format
Returns:
str: printed version of dataframe
"""
out: str
if print_format == 'latex':
out = df.reset_index().to_latex(index=False)
elif print_format == 'text':
out = df.reset_index().to_string(index=False)
elif print_format == 'json':
out = df.to_json(orient='index')
else:
raise ValueError()
return out
def property_scores_to_string(
dues: List[DueEvaluator], print_format: str = 'text', columns: Sequence[str] = ('Precision', 'Recall', 'F-1'),
) -> str:
"""Print out scores per property.
Args:
dues: List of DueEvaluators
print_format: output format: text or latex
columns: a list of metrics to print
Returns:
str: string table with feature scores.
"""
data = []
for property_name in sorted(dues[0].property_scorers.keys()) + ['ALL']:
row_data: Dict[str, Union[str, float]] = {}
row_data['Label'] = property_name
for due in dues:
if len(dues) == 1:
suffix = ''
else:
suffix = f' ({due.path})'
if property_name == 'ALL':
scorer = due.general_scorer
else:
scorer = due.property_scorers[property_name]
row_data[scorer.metric_name() + suffix] = scorer.score()
if isinstance(scorer, FScorer):
if 'Precision' in columns:
row_data['Precision' + suffix] = scorer.precision()
if 'Recall' in columns:
row_data['Recall' + suffix] = scorer.recall()
data.append(row_data)
df = pd.DataFrame(data)
df.set_index('Label', drop=True, inplace=True)
return dataframe_to_print(df, print_format)
import collections
import itertools
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union
from icecream import ic
import re
from pycocoevalcap.tokenizer.ptbtokenizer import PTBTokenizer
from pycocoevalcap.cider.cider import Cider
from pycocoevalcap.bleu.bleu import Bleu
from pycocoevalcap.rouge.rouge import Rouge
from pycocoevalcap.meteor.meteor import Meteor
import editdistance
"""
this script support:
ANLS for DocVQA
RelaxedAccuracy for ChartQA
ContainAccuracy for MultimodalOCR LLM zero-shot text-recognition
"""
def anls_metric(target: str, prediction: str, theta: float = 0.5):
"""Calculates ANLS for DocVQA.
There does not seem to be an official evaluation script.
Public implementation on which this implementation is based:
https://github.com/herobd/layoutlmv2/blob/main/eval_docvqa.py#L92
Original paper (see Eq 1): https://arxiv.org/pdf/1907.00490.pdf
Args:
target: Target string.
prediction: Predicted string.
theta: Filter threshold set to 0.5 for DocVQA.
Returns:
ANLS score.
"""
edit_distance = editdistance.eval(target, prediction)
normalized_ld = edit_distance / max(len(target), len(prediction))
return 1.0 - normalized_ld if normalized_ld < theta else 0.0
def relaxed_correctness(target: str,
prediction: str,
max_relative_change: float = 0.05) -> bool:
"""Calculates relaxed correctness.
The correctness tolerates certain error ratio defined by max_relative_change.
See https://arxiv.org/pdf/2203.10244.pdf, end of section 5.1:
“Following Methani et al. (2020), we use a relaxed accuracy measure for the
numeric answers to allow a minor inaccuracy that may result from the automatic
data extraction process. We consider an answer to be correct if it is within
5% of the gold answer. For non-numeric answers, we still need an exact match
to consider an answer to be correct.”
Args:
target: Target string.
prediction: Predicted string.
max_relative_change: Maximum relative change.
Returns:
Whether the prediction was correct given the specified tolerance.
"""
def _to_float(text: str) -> Optional[float]:
try:
if text.endswith("%"):
# Convert percentages to floats.
return float(text.rstrip("%")) / 100.0
else:
return float(text)
except ValueError:
return None
prediction_float = _to_float(prediction)
target_float = _to_float(target)
if prediction_float is not None and target_float:
relative_change = abs(prediction_float - target_float) / abs(target_float)
return float(relative_change <= max_relative_change)
else:
return float(prediction.lower() == target.lower())
def exact_match(target: str, prediction: str):
return float(target == prediction)
def iou_match(target: list, prediction: list, threshold=0.5):
"""
target/prediction: normalized bbox (list(float)), xyxy
"""
g_x1, g_y1, g_x2, g_y2 = target
p_x1, p_y1, p_x2, p_y2 = prediction
g_w = g_x2 - g_x1
p_w = p_x2 - p_x1
g_h = g_y2 - g_y1
p_h = p_y2 - p_y1
W = (min(g_x2, p_x2)-max(g_x1, p_x1))
H = (min(g_y2, p_y2)-max(g_y1, p_y1))
Intersection = W*H
if Intersection <= 0:
return 0.0
Union = g_w*g_h + p_w*p_h -Intersection
# ic(W, H, Intersection, Union)
if Intersection / Union >= threshold:
return 1.0
else:
return 0.0
def remove_special_chars_and_lower(s):
pattern = r"[^a-zA-Z0-9\s]"
# print('raw:', s)
s = re.sub(pattern, "", s)
# print('new:', s)
return s.lower()
def contain_match(target:str, prediction:str):
def has_word(sentence, word):
pattern = r"\b" + re.escape(word) + r"\b"
match = re.search(pattern, sentence)
if match:
return True
else:
return False
# print(prediction, target, float(has_word(prediction, target)))
return float(has_word(prediction, target))
def cider(
targets: Sequence[Sequence[str]],
predictions: Sequence[str]) -> float:
"""Compute CIDEr score."""
coco_tokenizer = PTBTokenizer()
scorer = Cider()
score, scores = scorer.compute_score(
gts=coco_tokenizer.tokenize({
str(i): [{"caption": t} for t in target]
for i, target in enumerate(targets)
}),
res=coco_tokenizer.tokenize({
str(i): [{"caption": prediction}]
for i, prediction in enumerate(predictions)
}))
score = float(score) * 100.0
scores = [float(s) * 100.0 for s in scores.tolist()]
return score, scores
def rouge(
targets: Sequence[Sequence[str]],
predictions: Sequence[str]) -> float:
"""Compute CIDEr score."""
coco_tokenizer = PTBTokenizer()
scorer = Rouge()
score, scores = scorer.compute_score(
gts=coco_tokenizer.tokenize({
str(i): [{"caption": t} for t in target]
for i, target in enumerate(targets)
}),
res=coco_tokenizer.tokenize({
str(i): [{"caption": prediction}]
for i, prediction in enumerate(predictions)
}))
score = float(score) * 100.0
scores = [float(s) * 100.0 for s in scores.tolist()]
return score, scores
def meteor(
targets: Sequence[Sequence[str]],
predictions: Sequence[str]) -> float:
"""Compute CIDEr score."""
coco_tokenizer = PTBTokenizer()
scorer = Meteor()
score, scores = scorer.compute_score(
gts=coco_tokenizer.tokenize({
str(i): [{"caption": t} for t in target]
for i, target in enumerate(targets)
}),
res=coco_tokenizer.tokenize({
str(i): [{"caption": prediction}]
for i, prediction in enumerate(predictions)
}))
score = float(score) * 100.0
scores = [float(s) * 100.0 for s in scores]
return score, scores
def bleu(
ngram: int,
targets: Sequence[Sequence[str]],
predictions: Sequence[str]) -> float:
"""Compute Bleu score."""
assert ngram <= 4
coco_tokenizer = PTBTokenizer()
scorer = Bleu(4)
score, scores = scorer.compute_score(
gts=coco_tokenizer.tokenize({
str(i): [{"caption": t} for t in target]
for i, target in enumerate(targets)
}),
res=coco_tokenizer.tokenize({
str(i): [{"caption": prediction}]
for i, prediction in enumerate(predictions)
}))
score = score[ngram-1]
scores = scores[ngram-1]
# ic(score)
# ic(scores)
score = float(score) * 100.0
scores = [float(s) * 100.0 for s in scores]
return score, scores
def metric_calculate(
targets: Sequence[Sequence[str]],
predictions: Sequence[str],
metric_fn: Callable[[str, str], Any],
normalize_fn: Callable[[str], str] = lambda v: v):
"""Aggregate target-prediction pair metrics over a dataset."""
assert len(targets) == len(predictions)
total = 0
scores = []
for prediction, target in zip(predictions, targets):
p = normalize_fn(prediction)
score = max(metric_fn(normalize_fn(t), p) for t in target)
scores.append(score)
total += score
score = (100.0 * total) / len(targets)
return score, scores
def doc_evaluate(
metric: str,
targets: Sequence[Sequence[str]],
predictions: Sequence[str]):
"""Calculates evaluation metrics.
Args:
metrcs: metric names
targets: list of list of strings.
predictions: list of strings.
Returns:
dictionary with metric names as keys and metric value as values.
"""
results = {}
assert metric in ['ExactAccuracy', 'RelaxedAccuracy', 'ANLS', 'ContainAccuracy',
'CIDEr', 'BLEU1', 'BLEU2', 'BLEU3', 'BLEU4', 'RougeL', 'Meteor',
'IOU@0.5']
if metric=='ExactAccuracy': # case sensitive
score, scores = metric_calculate(targets, predictions, metric_fn=exact_match)
elif metric=='IOU@0.5':
score, scores = metric_calculate(targets, predictions, metric_fn=iou_match)
elif metric == 'ANLS':
score, scores = metric_calculate(targets, predictions, metric_fn=anls_metric, normalize_fn=lambda v: v.lower())
elif metric == 'RelaxedAccuracy':
score, scores = metric_calculate(targets, predictions, metric_fn=relaxed_correctness)
elif metric == 'ContainAccuracy':
score, scores = metric_calculate(targets, predictions, metric_fn=contain_match, normalize_fn=remove_special_chars_and_lower)
elif metric == 'CIDEr':
score, scores = cider(targets, predictions)
elif metric == 'BLEU1':
score, scores = bleu(1, targets, predictions)
elif metric == 'BLEU2':
score, scores = bleu(2, targets, predictions)
elif metric == 'BLEU3':
score, scores = bleu(3, targets, predictions)
elif metric == 'BLEU4':
score, scores = bleu(4, targets, predictions)
elif metric == 'RougeL':
score, scores = rouge(targets, predictions)
elif metric == 'Meteor':
score, scores = meteor(targets, predictions)
return score, scores
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment