Unverified Commit ce777853 authored by Yuanchen's avatar Yuanchen Committed by GitHub
Browse files

[feature] ColossalEval: Evaluation Pipeline for LLMs (#4786)



* Add ColossalEval

* Delete evaluate in Chat

---------
Co-authored-by: default avatarXu Yuanchen <yuanchen.xu00@gmail.com>
Co-authored-by: default avatarTong Li <tong.li352711588@gmail.com>
parent 74aa7d96
from .dataset_evaluator import DatasetEvaluator
__all__ = ["DatasetEvaluator"]
from typing import Dict, List
import colossal_eval.evaluate.dataset_evaluator.metrics as metric_helper
import numpy as np
import tqdm
LabelBasedMetrics = ["first_token_accuracy", "matthews_correlation"]
LossBasedMetrics = ["perplexity", "ppl_score", "ppl_score_over_choices", "per_byte_perplexity", "per_byte_ppl_score"]
CombinedMetrics = ["combined_single_choice_accuracy"]
OtherMetrics = [
"f1_score",
"f1_zh_score",
"rouge_score",
"rouge_zh_score",
"retrieval_score",
"retrieval_zh_score",
"classification_score",
"code_sim_score",
"count_score",
"multi_choice_accuracy",
"math_equivalence",
"single_choice_accuracy",
]
class DatasetEvaluator(object):
"""
Dataset evaluator.
"""
def __init__(self):
pass
def _calculate_label_metrics(self, metric: str, category: str):
"""Calculate label-based metrics."""
weight = len(self.data[category]["data"]) / self.metric_total_length[metric]
str_label_map = {
choice: idx for idx, choice in enumerate(self.data[category]["inference_kwargs"]["all_classes"])
}
references = [str_label_map[sample["target"]] for sample in self.data[category]["data"]]
[sample["output"] for sample in self.data[category]["data"]]
flag = False
softmaxs = []
for i, sample in enumerate(self.data[category]["data"]):
if np.any(np.isnan(np.array(list(sample["softmax_over_choices"].values())))):
if not flag:
print(
f"NaN in the softmax, switch to exact match for category {category} in dataset {self.dataset_name} in model {self.model_name}."
)
flag = True
score = 0
for ref in sample["target"]:
score = max(
score,
metric_helper.single_choice_accuracy(
sample["output"], ref, all_classes=self.data[category]["inference_kwargs"]["all_classes"]
),
)
softmaxs.append(references[i] if score == 1 else -1)
else:
softmaxs.append(np.argmax(np.array(list(sample["softmax_over_choices"].values()))))
references = np.array(references)
softmaxs = np.array(softmaxs)
scores = np.sum(references == softmaxs) / len(self.data[category]["data"]) * 100
self.evaluation_results[metric][category] = (scores, len(self.data[category]["data"]))
self.evaluation_results[metric]["ALL"] += scores * weight
def _calculate_combined_metrics(self, metric: str, category: str):
"""Calculate combined metrics."""
weight = len(self.data[category]["data"]) / self.metric_total_length[metric]
references = [sample["target"] for sample in self.data[category]["data"]]
predictions = [sample["output"] for sample in self.data[category]["data"]]
str_label_map = {
choice: idx for idx, choice in enumerate(self.data[category]["inference_kwargs"]["all_classes"])
}
references_labels = [str_label_map[sample["target"][0]] for sample in self.data[category]["data"]]
predictions = [sample["output"] for sample in self.data[category]["data"]]
flag = False
softmaxs = []
for i, sample in enumerate(self.data[category]["data"]):
if np.any(np.isnan(np.array(list(sample["softmax_over_choices"].values())))):
if not flag:
print(
f"NaN in the softmax, switch to exact match for category {category} in dataset {self.dataset_name} in model {self.model_name}."
)
flag = True
score = 0
for ref in sample["target"]:
score = max(
score,
metric_helper.single_choice_accuracy(
sample["output"], ref, all_classes=self.data[category]["inference_kwargs"]["all_classes"]
),
)
softmaxs.append(references[i] if score == 1 else -1)
else:
softmaxs.append(np.argmax(np.array(list(sample["softmax_over_choices"].values()))))
metric_method = eval("metric_helper." + metric)
total_score = 0.0
for prediction, reference, references_label, softmax in zip(
predictions, references, references_labels, softmaxs
):
score = 0.0
for ref in reference:
score = max(
score,
metric_method(prediction, ref, all_classes=self.data[category]["inference_kwargs"]["all_classes"]),
)
if references_label == softmax:
score = 1
total_score += score
total_score = total_score * 100 / len(self.data[category]["data"])
self.evaluation_results[metric][category] = (total_score, len(self.data[category]["data"]))
self.evaluation_results[metric]["ALL"] += total_score * weight
def _calculate_other_metrics(self, metric: str, category: str):
"""Calculate other metrics."""
weight = len(self.data[category]["data"]) / self.metric_total_length[metric]
references = [sample["target"] for sample in self.data[category]["data"]]
predictions = [sample["output"] for sample in self.data[category]["data"]]
metric_method = eval("metric_helper." + metric)
total_score = 0.0
for prediction, reference in zip(predictions, references):
score = 0.0
for ref in reference:
score = max(
score,
metric_method(prediction, ref, all_classes=self.data[category]["inference_kwargs"]["all_classes"]),
)
total_score += score
total_score = total_score * 100 / len(predictions)
self.evaluation_results[metric][category] = (total_score, len(self.data[category]["data"]))
self.evaluation_results[metric]["ALL"] += total_score * weight
def _calculate_loss_metrics(self, metric: str, category: str):
"""Calculate perplexity."""
if metric == "perplexity":
weight = len(self.data[category]["data"]) / self.metric_total_length[metric]
losses = [min(sample["loss"]) for sample in self.data[category]["data"]]
perplexity = np.mean(np.exp(np.array(losses)))
self.evaluation_results["perplexity"][category] = (perplexity, len(self.data[category]["data"]))
self.evaluation_results["perplexity"]["ALL"] += perplexity * weight
elif metric == "ppl_score":
weight = len(self.data[category]["data"]) / self.metric_total_length[metric]
losses = [min(sample["loss"]) for sample in self.data[category]["data"]]
perplexity_score = np.mean(np.exp(-np.array(losses))) * 100
self.evaluation_results["ppl_score"][category] = (perplexity_score, len(self.data[category]["data"]))
self.evaluation_results["ppl_score"]["ALL"] += perplexity_score * weight
elif metric == "ppl_score_over_choices" and self.data[category]["inference_kwargs"]["all_classes"] is not None:
weight = len(self.data[category]["data"]) / self.metric_total_length[metric]
loss_over_choices = [sample["loss_over_choices"] for sample in self.data[category]["data"]]
perplexity_score_over_choices = np.mean(np.exp(-np.array(loss_over_choices))) * 100
self.evaluation_results["ppl_score_over_choices"][category] = (
perplexity_score_over_choices,
len(self.data[category]["data"]),
)
self.evaluation_results["ppl_score_over_choices"]["ALL"] += perplexity_score_over_choices * weight
elif metric == "per_byte_perplexity":
weight = len(self.data[category]["data"]) / self.metric_total_length[metric]
losses = [min(sample["loss_sum"]) for sample in self.data[category]["data"]]
perplexity = np.mean(np.exp(np.array(losses) / np.array(self.N_bytes[category])))
self.evaluation_results["per_byte_perplexity"][category] = perplexity
self.evaluation_results["per_byte_perplexity"]["ALL"] += perplexity * weight
elif metric == "per_byte_ppl_score":
weight = len(self.data[category]["data"]) / self.metric_total_length[metric]
losses = [min(sample["loss_sum"]) for sample in self.data[category]["data"]]
perplexity_score = np.mean(np.exp(-np.array(losses) / np.array(self.N_bytes[category]))) * 100
self.evaluation_results["per_byte_ppl_score"][category] = perplexity_score
self.evaluation_results["per_byte_ppl_score"]["ALL"] += perplexity_score * weight
def _evaluate(self):
"""Calculate and return evaluation results"""
for metric in self.metrics:
pbar = tqdm.tqdm(
desc=f"{self.dataset_name}-{metric}-{self.model_name}", total=len(self.suggested_categories[metric])
)
if metric in LabelBasedMetrics:
for category in self.suggested_categories[metric]:
self._calculate_label_metrics(metric, category)
pbar.update(1)
elif metric in LossBasedMetrics:
for category in self.suggested_categories[metric]:
self._calculate_loss_metrics(metric, category)
pbar.update(1)
elif metric in CombinedMetrics:
for category in self.suggested_categories[metric]:
self._calculate_combined_metrics(metric, category)
pbar.update(1)
elif metric in OtherMetrics:
for category in self.suggested_categories[metric]:
self._calculate_other_metrics(metric, category)
pbar.update(1)
return self.evaluation_results
def get_evaluation_results(self, data: List[Dict], dataset_name: str, model_name: str, metrics: List[str]):
"""
Evaluate inference data on the given metrics.
Args:
data: Data to be evaluated.
dataset_name: Name of the dataset
model_name: Name of the model
metrics: Metrics used to evaluate.
"""
self.data = data
self.dataset_name = dataset_name
self.model_name = model_name
self.categories = list(data.keys())
self.metrics = metrics
self.evaluation_results = {
metric: {category: 0 for category in (["ALL"] + self.categories)} for metric in self.metrics
}
self.total_length = 0
self.total_single_choices = 0
for value in self.data.values():
self.total_length += len(value["data"])
if value["inference_kwargs"]["all_classes"] is not None:
self.total_single_choices += len(value["data"])
self.metric_total_length = {metric: 0 for metric in self.metrics}
self.suggested_categories = {metric: [] for metric in self.metrics}
for metric in self.metrics:
self.suggested_categories[metric] = metric_helper.metrics4subcategory[self.dataset_name][metric]
if "ALL" in self.suggested_categories[metric]:
self.suggested_categories[metric] = self.categories
self.metric_total_length[metric] = self.total_length
continue
for category in self.suggested_categories[metric]:
self.metric_total_length[metric] += len(self.data[category]["data"])
if "per_byte_perplexity" in self.metrics or "per_byte_ppl_score" in self.metrics:
self.N_bytes = {category: [] for category in self.categories}
for category in self.categories:
samples = self.data[category]["data"]
for sample in samples:
self.N_bytes[category].append(sample["byte_num"][0])
return self._evaluate()
# Code adapted from https://github.com/THUDM/LongBench/blob/main/metrics.py
# Code adapted from https://github.com/hendrycks/math/blob/main/modeling/math_equivalence.py
# Code adapted from https://github.com/ruixiangcui/AGIEval/blob/main/src/evaluation.py
import difflib
import re
import string
from collections import Counter
import jieba
from fuzzywuzzy import fuzz
from rouge import Rouge
metrics4subcategory = {
"pretrain": {
"perplexity": ["ALL"],
"ppl_score": ["ALL"],
"per_byte_perplexity": ["ALL"],
"per_byte_ppl_score": ["ALL"],
},
# The commented are non 4-choice questions.
"agieval": {
"combined_single_choice_accuracy": [
# "lsat-ar",
# "lsat-lr",
# "lsat-rc",
"logiqa-en",
"sat-math",
"sat-en",
# "aqua-rat",
"sat-en-without-passage",
"gaokao-english",
"logiqa-zh",
"gaokao-chinese",
"gaokao-geography",
"gaokao-history",
"gaokao-biology",
"gaokao-chemistry",
],
"first_token_accuracy": [
# "lsat-ar",
# "lsat-lr",
# "lsat-rc",
"logiqa-en",
"sat-math",
"sat-en",
# "aqua-rat",
"sat-en-without-passage",
"gaokao-english",
"logiqa-zh",
"gaokao-chinese",
"gaokao-geography",
"gaokao-history",
"gaokao-biology",
"gaokao-chemistry",
],
"single_choice_accuracy": [
# "lsat-ar",
# "lsat-lr",
# "lsat-rc",
"logiqa-en",
"sat-math",
"sat-en",
# "aqua-rat",
"sat-en-without-passage",
"gaokao-english",
"logiqa-zh",
"gaokao-chinese",
"gaokao-geography",
"gaokao-history",
"gaokao-biology",
"gaokao-chemistry",
],
"multi_choice_accuracy": ["jec-qa-kd", "jec-qa-ca", "gaokao-physics", "gaokao-mathqa"],
"math_equivalence": ["gaokao-mathcloze", "math"],
"perplexity": ["ALL"],
"ppl_score_over_choices": [
"lsat-ar",
"lsat-lr",
"lsat-rc",
"logiqa-en",
"sat-math",
"sat-en",
"aqua-rat",
"sat-en-without-passage",
"gaokao-english",
"logiqa-zh",
"jec-qa-kd",
"jec-qa-ca",
"gaokao-chinese",
"gaokao-geography",
"gaokao-history",
"gaokao-biology",
"gaokao-chemistry",
"gaokao-physics",
"gaokao-mathqa",
],
"ppl_score": ["ALL"],
},
"cmmlu": {
"first_token_accuracy": ["ALL"],
"single_choice_accuracy": ["ALL"],
"perplexity": ["ALL"],
"ppl_score_over_choices": ["ALL"],
"ppl_score": ["ALL"],
},
"gaokaobench": {
"combined_single_choice_accuracy": [
"English MCQs",
"Biology MCQs",
"Chemistry MCQs",
"History MCQs",
"Math I MCQs",
"Math II MCQs",
"Political Science MCQs",
],
"first_token_accuracy": [
"English MCQs",
"Biology MCQs",
"Chemistry MCQs",
"History MCQs",
"Math I MCQs",
"Math II MCQs",
"Political Science MCQs",
],
"single_choice_accuracy": [
"English MCQs",
"Biology MCQs",
"Chemistry MCQs",
"History MCQs",
"Math I MCQs",
"Math II MCQs",
"Political Science MCQs",
],
"multi_choice_accuracy": [
"Chinese Lang and Usage MCQs",
"Chinese Modern Lit",
"English Fill in Blanks",
"English Reading Comp",
"Geography MCQs",
"Physics MCQs",
"English Cloze Test",
],
"math_equivalence": ["Math I Fill-in-the-Blank", "Math II Fill-in-the-Blank"],
"rouge_score": ["English Language Cloze Passage"],
"rouge_zh_score": [
"Chinese Language Famous Passages and Sentences Dictation",
"Chemistry Open-ended Questions",
"History Open-ended Questions",
"Biology Open-ended Questions",
"Political Science Open-ended Questions",
"English Language Error Correction",
"Chinese Language Language and Writing Skills Open-ended Questions",
"Math II Open-ended Questions",
"Chinese Language Literary Text Reading",
"Chinese Language Ancient Poetry Reading",
"Chinese Language Classical Chinese Reading",
"Physics Open-ended Questions",
"Math I Open-ended Questions",
"Geography Open-ended Questions",
"Chinese Language Practical Text Reading",
],
"perplexity": ["ALL"],
"ppl_score_over_choices": ["ALL"],
"ppl_score": ["ALL"],
},
"longbench": {
"f1_score": ["hotpotqa", "2wikimqa", "musique", "narrativeqa", "qasper", "multifieldqa_en", "triviaqa"],
"f1_zh_score": ["multifieldqa_zh"],
"rouge_score": ["gov_report", "qmsum", "multi_news", "samsum"],
"rouge_zh_score": ["dureader", "vcsum"],
"retrieval_score": ["passage_retrieval_en"],
"retrieval_zh_score": ["passage_retrieval_zh"],
"classification_score": ["trec", "lsht"],
"code_sim_score": ["lcc", "repobench-p"],
"count_score": ["passage_count"],
"perplexity": ["ALL"],
"ppl_score": ["ALL"],
},
"mmlu": {
"first_token_accuracy": ["ALL"],
"single_choice_accuracy": ["ALL"],
"accuracy": ["ALL"],
"perplexity": ["ALL"],
"ppl_score_over_choices": ["ALL"],
"ppl_score": ["ALL"],
},
}
def _fix_fracs(string):
substrs = string.split("\\frac")
new_str = substrs[0]
if len(substrs) > 1:
substrs = substrs[1:]
for substr in substrs:
new_str += "\\frac"
if substr[0] == "{":
new_str += substr
else:
try:
assert len(substr) >= 2
except:
return string
a = substr[0]
b = substr[1]
if b != "{":
if len(substr) > 2:
post_substr = substr[2:]
new_str += "{" + a + "}{" + b + "}" + post_substr
else:
new_str += "{" + a + "}{" + b + "}"
else:
if len(substr) > 2:
post_substr = substr[2:]
new_str += "{" + a + "}" + b + post_substr
else:
new_str += "{" + a + "}" + b
string = new_str
return string
def _fix_a_slash_b(string):
if len(string.split("/")) != 2:
return string
a = string.split("/")[0]
b = string.split("/")[1]
try:
a = int(a)
b = int(b)
assert string == "{}/{}".format(a, b)
new_string = "\\frac{" + str(a) + "}{" + str(b) + "}"
return new_string
except:
return string
def _remove_right_units(string):
# "\\text{ " only ever occurs (at least in the val set) when describing units
if "\\text{ " in string:
splits = string.split("\\text{ ")
assert len(splits) == 2
return splits[0]
else:
return string
def _fix_sqrt(string):
if "\\sqrt" not in string:
return string
splits = string.split("\\sqrt")
new_string = splits[0]
for split in splits[1:]:
if split[0] != "{":
a = split[0]
new_substr = "\\sqrt{" + a + "}" + split[1:]
else:
new_substr = "\\sqrt" + split
new_string += new_substr
return new_string
def _strip_string(string):
# linebreaks
string = string.replace("\n", "")
# print(string)
# remove inverse spaces
string = string.replace("\\!", "")
# print(string)
# replace \\ with \
string = string.replace("\\\\", "\\")
# print(string)
# replace tfrac and dfrac with frac
string = string.replace("tfrac", "frac")
string = string.replace("dfrac", "frac")
# print(string)
# remove \left and \right
string = string.replace("\\left", "")
string = string.replace("\\right", "")
# print(string)
# Remove circ (degrees)
string = string.replace("^{\\circ}", "")
string = string.replace("^\\circ", "")
# remove dollar signs
string = string.replace("\\$", "")
# remove units (on the right)
string = _remove_right_units(string)
# remove percentage
string = string.replace("\\%", "")
string = string.replace("\%", "")
# " 0." equivalent to " ." and "{0." equivalent to "{." Alternatively, add "0" if "." is the start of the string
string = string.replace(" .", " 0.")
string = string.replace("{.", "{0.")
# if empty, return empty string
if len(string) == 0:
return string
if string[0] == ".":
string = "0" + string
# to consider: get rid of e.g. "k = " or "q = " at beginning
if len(string.split("=")) == 2:
if len(string.split("=")[0]) <= 2:
string = string.split("=")[1]
# fix sqrt3 --> sqrt{3}
string = _fix_sqrt(string)
# remove spaces
string = string.replace(" ", "")
# \frac1b or \frac12 --> \frac{1}{b} and \frac{1}{2}, etc. Even works with \frac1{72} (but not \frac{72}1). Also does a/b --> \\frac{a}{b}
string = _fix_fracs(string)
# manually change 0.5 --> \frac{1}{2}
if string == "0.5":
string = "\\frac{1}{2}"
# NOTE: X/Y changed to \frac{X}{Y} in dataset, but in simple cases fix in case the model output is X/Y
string = _fix_a_slash_b(string)
return string
def parse_math_answer(raw_string):
def remove_boxed(s):
left = "\\boxed{"
try:
assert s[: len(left)] == left
assert s[-1] == "}"
answer = s[len(left) : -1]
if "=" in answer:
answer = answer.split("=")[-1].lstrip(" ")
return answer
except:
return None
def last_boxed_only_string(string):
idx = string.rfind("\\boxed")
if idx < 0:
idx = string.rfind("\\fbox")
if idx < 0:
return None
i = idx
right_brace_idx = None
num_left_braces_open = 0
while i < len(string):
if string[i] == "{":
num_left_braces_open += 1
if string[i] == "}":
num_left_braces_open -= 1
if num_left_braces_open == 0:
right_brace_idx = i
break
i += 1
if right_brace_idx == None:
retval = None
else:
retval = string[idx : right_brace_idx + 1]
return retval
def get_answer_with_dollar_sign(s):
first_pattern = "\$(.*)\$"
last_match = None
matches = re.findall(first_pattern, s)
if matches:
last_match = matches[-1]
if "=" in last_match:
last_match = last_match.split("=")[-1].lstrip(" ")
return last_match
def get_answer_without_dollar_sign(s):
last_match = None
if "=" in s:
last_match = s.split("=")[-1].lstrip(" ").rstrip(".")
if "\\n" in last_match:
last_match = last_match.split("\\n")[0]
else:
pattern = "(?:\\$)?\d+(?:\.\d+)?(?![\w\d])"
matches = re.findall(pattern, s)
if matches:
last_match = matches[-1]
return last_match
if "\\boxed" in raw_string:
answer = remove_boxed(last_boxed_only_string(raw_string))
else:
answer = get_answer_with_dollar_sign(raw_string)
if not answer:
answer = get_answer_without_dollar_sign(raw_string)
return answer
def math_equivalence(prediction, reference, **kwargs):
prediction = parse_math_answer(prediction)
if prediction is None and reference is None:
print("WARNING: Both None")
return False
if prediction is None or reference is None:
return False
try:
ss1 = _strip_string(prediction)
ss2 = _strip_string(reference)
return ss1 == ss2
except:
return prediction == reference
def multi_choice_accuracy(prediction, reference, **kwargs):
# Only find uppercase letters not surrounded by lowercase letters
all_classes = kwargs.get("all_classes", None)
if all_classes:
pattern = f"(?<![a-z])[{all_classes[0]}-{all_classes[-1]}](?![a-z])"
else:
pattern = "(?<![a-z])[A-F](?![a-z])"
prediction = re.findall(pattern, prediction)
reference = re.findall(pattern, reference)
prediction_set = set(prediction)
reference_set = set(reference)
score = 0.0
for p in prediction_set:
if p not in reference_set:
return 0.0
else:
score += 1 / len(reference_set)
return score
def combined_single_choice_accuracy(prediction, reference, **kwargs):
return single_choice_accuracy(prediction, reference, **kwargs)
def single_choice_accuracy(prediction, reference, **kwargs):
# Only find uppercase letters not surrounded by lowercase letters
all_classes = kwargs.get("all_classes", None)
if all_classes:
pattern = f"(?<![a-z])[{all_classes[0]}-{all_classes[-1]}](?![a-z])"
else:
pattern = "(?<![a-z])[A-F](?![a-z])"
prediction = re.findall(pattern, prediction)[0:1]
reference = re.findall(pattern, reference)
assert len(reference) == 1
prediction_set = set(prediction)
reference_set = set(reference)
if prediction_set == reference_set:
return 1.0
return 0.0
def normalize_answer(s):
"""Lower text and remove punctuation, articles and extra whitespace."""
def remove_articles(text):
return re.sub(r"\b(a|an|the)\b", " ", text)
def white_space_fix(text):
return " ".join(text.split())
def remove_punc(text):
exclude = set(string.punctuation)
return "".join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
return white_space_fix(remove_articles(remove_punc(lower(s))))
def normalize_zh_answer(s):
"""Lower text and remove punctuation, extra whitespace."""
def white_space_fix(text):
return "".join(text.split())
def remove_punc(text):
cn_punctuation = "!?。。"#$%&'()*+,-/:;<=>@[\]^_`{|}~⦅⦆「」、、〃》「」『』【】〔〕〖〗〘〙〚〛〜〝〞〟〰〾〿–—‘’‛“”„‟…‧﹏."
all_punctuation = set(string.punctuation + cn_punctuation)
return "".join(ch for ch in text if ch not in all_punctuation)
def lower(text):
return text.lower()
return white_space_fix(remove_punc(lower(s)))
def count_score(prediction, reference, **kwargs):
numbers = re.findall(r"\d+", prediction)
right_num = 0
for number in numbers:
if str(number) == str(reference):
right_num += 1
final_score = 0.0 if len(numbers) == 0 else right_num / len(numbers)
return float(final_score)
def retrieval_score(prediction, reference, **kwargs):
pattern = r"Paragraph (\d+)"
matches = re.findall(pattern, reference)
ground_truth_id = matches[0]
numbers = re.findall(r"\d+", prediction)
right_num = 0
for number in numbers:
if str(number) == str(ground_truth_id):
right_num += 1
final_score = 0.0 if len(numbers) == 0 else right_num / len(numbers)
return float(final_score)
def retrieval_zh_score(prediction, reference, **kwargs):
pattern = r"段落(\d+)"
matches = re.findall(pattern, reference)
ground_truth_id = matches[0]
numbers = re.findall(r"\d+", prediction)
right_num = 0
for number in numbers:
if str(number) == str(ground_truth_id):
right_num += 1
final_score = 0.0 if len(numbers) == 0 else right_num / len(numbers)
return float(final_score)
def code_sim_score(prediction, reference, **kwargs):
all_lines = prediction.lstrip("\n").split("\n")
prediction = ""
for line in all_lines:
if ("`" not in line) and ("#" not in line) and ("//" not in line):
prediction = line
break
return fuzz.ratio(prediction, reference) / 100
def classification_score(prediction, reference, **kwargs):
em_match_list = []
all_classes = kwargs["all_classes"]
for class_name in all_classes:
if class_name in prediction:
em_match_list.append(class_name)
for match_term in em_match_list:
if match_term in reference and match_term != reference:
em_match_list.remove(match_term)
if em_match_list != 0:
if reference in em_match_list:
score = 1.0 / len(em_match_list)
else:
score = 0.0
else:
best_match = None
highest_similarity = 0
for string in all_classes:
similarity = difflib.SequenceMatcher(None, string, prediction).ratio()
if similarity > highest_similarity:
highest_similarity = similarity
best_match = string
score = float(best_match == reference)
return score
def rouge_score(prediction, reference, **kwargs):
rouge = Rouge()
try:
scores = rouge.get_scores([prediction], [reference], avg=True)
except:
return 0.0
return scores["rouge-l"]["f"]
def rouge_zh_score(prediction, reference, **kwargs):
prediction = " ".join(list(jieba.cut(prediction, cut_all=False)))
reference = " ".join(list(jieba.cut(reference, cut_all=False)))
score = rouge_score(prediction, reference)
return score
def _f1_score(prediction, reference, **kwargs):
common = Counter(prediction) & Counter(reference)
num_same = sum(common.values())
if num_same == 0:
return 0
precision = 1.0 * num_same / len(prediction)
recall = 1.0 * num_same / len(reference)
f1 = (2 * precision * recall) / (precision + recall)
return f1
def f1_score(prediction, reference, **kwargs):
normalized_prediction = normalize_answer(prediction)
normalized_ground_truth = normalize_answer(reference)
prediction_tokens = normalized_prediction.split()
ground_truth_tokens = normalized_ground_truth.split()
return _f1_score(prediction_tokens, ground_truth_tokens)
def f1_zh_score(prediction, reference, **kwargs):
prediction_tokens = list(jieba.cut(prediction, cut_all=False))
ground_truth_tokens = list(jieba.cut(reference, cut_all=False))
prediction_tokens = [normalize_zh_answer(token) for token in prediction_tokens]
ground_truth_tokens = [normalize_zh_answer(token) for token in ground_truth_tokens]
prediction_tokens = [token for token in prediction_tokens if len(token) > 0]
ground_truth_tokens = [token for token in ground_truth_tokens if len(token) > 0]
return _f1_score(prediction_tokens, ground_truth_tokens)
import os
from typing import Any, Dict, List
import gpt_evaluate
import metrics
import unieval
from utils import analyze_automatic_results, get_data_per_category, save_automatic_results
import colossal_eval.evaluate.gpt_evaluate as gpt_evaluate
from .utils import get_data_per_category
class Evaluator(object):
"""
A class named Evaluator includes GPT-3.5/GPT-4 evaluation
and automatic evaluation
"""
......@@ -21,7 +19,6 @@ class Evaluator(object):
gpt_evaluation_prompt: Dict[str, Any],
gpt_model: str,
language: str,
path_for_UniEval: Dict[str, str],
gpt_with_reference: bool,
) -> None:
self.params = params
......@@ -29,10 +26,7 @@ class Evaluator(object):
self.gpt_evaluation_prompt = gpt_evaluation_prompt
self.gpt_model = gpt_model
self.language = language
self.path_for_UniEval = path_for_UniEval
self.gpt_with_reference = gpt_with_reference
self.automatic_metric_stats = dict()
self.unieval_metric_stats = dict()
self.gpt_evaluation_results = dict()
self.battle_results = []
......@@ -43,7 +37,7 @@ class Evaluator(object):
self.battle_results = gpt_evaluate.battle(answers1, answers2, self.battle_prompt)
def evaluate(self, answers: List[Dict], targets: List[Dict]) -> None:
def evaluate(self, answers: List[Dict], targets: List[Dict], save_path: str, model_name: str) -> None:
"""
A comprehensive evaluation of the answers from the model.
The function evaluates the model's performance from different perspectives
......@@ -53,102 +47,9 @@ class Evaluator(object):
"""
def switch(metric, language):
if metric == "BLEU":
return metrics.bleu_score(preds=predicts_list, targets=targets_list, language=language)
elif metric == "ROUGE":
return metrics.rouge_score(preds=predicts_list, targets=targets_list, language=language)
elif metric == "Distinct":
return metrics.distinct_score(preds=predicts_list, language=language)
elif metric == "BERTScore":
return metrics.bert_score(preds=predicts_list, targets=targets_list, language=language)
elif metric == "Precision":
return metrics.precision(preds=predicts_list, targets=targets_list, language=language)
elif metric == "Recall":
return metrics.recall(preds=predicts_list, targets=targets_list, language=language)
elif metric == "F1 score":
return metrics.F1_score(preds=predicts_list, targets=targets_list, language=language)
elif metric == "CHRF":
return metrics.chrf_score(preds=predicts_list, targets=targets_list, language=language)
else:
raise ValueError(f"Unexpected metric")
answers_per_category = get_data_per_category(answers, list(self.params.keys()))
targets_per_category = get_data_per_category(targets, list(self.params.keys()))
# automatic evaluation
for category in self.params:
if len(answers_per_category[category]) == 0:
print(f"Category {category} specified in your config doesn't have corresponding answers!")
continue
if self.params[category].get("Metrics", None) is None:
continue
category_metrics = self.params[category]["Metrics"]
self.automatic_metric_stats[category] = {}
targets_list = [
target["target"] if target["target"] else target["output"] for target in targets_per_category[category]
]
predicts_list = [answer["output"] for answer in answers_per_category[category]]
for metric in category_metrics:
self.automatic_metric_stats[category].update(switch(metric=metric, language=self.language))
# UniEval evaluation
# self.unieval_metric_stats's key is "task" instead of "category".
# Iterating "task" first will avoid repeated loading models because one task corresponds to one UniEval model.
# If key is "category", different models will be loaded for multiple times across categories because the user may require different task(models) to evaluate one category.
for category in self.params:
if len(answers_per_category[category]) == 0:
print(f"Category {category} specified in your config doesn't have corresponding answers!")
continue
if self.params[category].get("UniEval", None) is None:
continue
if self.params[category]["UniEval"] and self.language == "cn":
raise Exception(
"UniEval doesn't support Chinese! Please remove UniEval config in your Chinese config file."
)
category_metrics = self.params[category]["UniEval"]
for task, metric in [tuple(category_metric.split("-")) for category_metric in category_metrics]:
if self.unieval_metric_stats.get(task, None) is None:
self.unieval_metric_stats[task] = {category: {metric: 0}}
elif self.unieval_metric_stats[task].get(category, None) is None:
self.unieval_metric_stats[task][category] = {metric: 0}
else:
self.unieval_metric_stats[task][category][metric] = 0
for task in self.unieval_metric_stats:
if self.path_for_UniEval is None:
raise Exception(f"Please specify the path for UniEval model in the config file!")
if self.path_for_UniEval.get(task, None) is None:
raise Exception(f"Please specify the model path for task {task} in the config file!")
print(f"Load UniEval model for task {task}.")
uni_evaluator = unieval.get_evaluator(task, model_name_or_path=self.path_for_UniEval[task])
for category in self.unieval_metric_stats[task]:
targets_list = [
target["target"] if target["target"] else target["output"]
for target in targets_per_category[category]
]
predicts_list = [answer["output"] for answer in answers_per_category[category]]
sources_list = [answer["instruction"] + answer["input"] for answer in answers_per_category[category]]
data = unieval.convert_data_to_unieval_format(predicts_list, sources_list, targets_list)
scores = uni_evaluator.evaluate(
data, category, dims=list(self.unieval_metric_stats[task][category].keys()), overall=False
)
avg_scores = unieval.calculate_average_score(scores)
self.unieval_metric_stats[task][category].update(avg_scores)
# gpt evaluation
for category in self.params:
if len(answers_per_category[category]) == 0:
......@@ -170,6 +71,8 @@ class Evaluator(object):
prompt,
category_metrics,
category,
save_path,
model_name,
self.gpt_model,
self.language,
references=targets_per_category[category] if self.gpt_with_reference else None,
......@@ -185,28 +88,6 @@ class Evaluator(object):
save_path = os.path.join(path, "gpt_evaluate", "battle_results")
gpt_evaluate.save_battle_results(self.battle_results, model_name_list[0], model_name_list[1], save_path)
else:
if self.automatic_metric_stats:
# Save evaluation results for automatic metrics
automatic_base_save_path = os.path.join(path, "automatic_results")
automatic_results_save_path = os.path.join(automatic_base_save_path, "evaluation_results")
save_automatic_results(model_name_list[0], self.automatic_metric_stats, automatic_results_save_path)
# Save charts and csv.
automatic_analyses_save_path = os.path.join(automatic_base_save_path, "evaluation_analyses")
analyze_automatic_results(automatic_results_save_path, automatic_analyses_save_path)
if self.unieval_metric_stats:
# Save evaluation results for UniEval metrics
unieval_base_save_path = os.path.join(path, "unieval_results")
unieval_results_save_path = os.path.join(unieval_base_save_path, "evaluation_results")
unieval.save_unieval_results(model_name_list[0], self.unieval_metric_stats, unieval_results_save_path)
# Save charts and csv.
unieval_analyses_save_path = os.path.join(unieval_base_save_path, "evaluation_analyses")
unieval.analyze_unieval_results(unieval_results_save_path, unieval_analyses_save_path)
if self.gpt_evaluation_results:
# Save evaluation results for GPT evaluation metrics.
gpt_base_save_path = os.path.join(path, "gpt_evaluate", "gpt_evaluate_results")
......
def get_data_per_category(data, categories):
data_per_category = {category: [] for category in categories}
for item in data:
category = item["category"]
if category in categories:
data_per_category[category].append(item)
return data_per_category
from .base import BaseModel
from .chatglm import ChatGLM2Model, ChatGLMModel
from .huggingface import HuggingFaceCausalLM, HuggingFaceModel
__all__ = ["BaseModel", "HuggingFaceModel", "HuggingFaceCausalLM", "ChatGLMModel", "ChatGLM2Model"]
from abc import abstractclassmethod
from typing import Dict, List
from colossal_eval.utils import Conversation, prompt_templates
from colossalai.logging import DistributedLogger
class BaseModel:
"""
Base class for model wrapper.
Args:
path: The path to the model.
model_max_length: The maximum sequence length of the model.
prompt_template: The model's prompt template.
batch_size: Batch size for inference.
logger: Logger for the model.
"""
def __init__(
self,
path: str,
model_max_length: int = 2048,
prompt_template: Conversation = None,
batch_size: int = 1,
logger: DistributedLogger = None,
):
self.path = path
self.model_max_length = model_max_length
if prompt_template:
self.prompt_template = prompt_template
else:
self.prompt_template = prompt_templates["plain"]
self.batch_size = batch_size
self.logger = logger
@abstractclassmethod
def inference(self, data: List[Dict]) -> None:
"""
Infer the given data.
This function will call self.generate() to get model outputs and also self.model(input) to get logits.
Args:
data: The data for inference.
"""
@abstractclassmethod
def generate(self, inputs: List[str], max_new_tokens: int) -> List[str]:
"""
Generate results given a list of inputs.
Args:
inputs: A list of strings.
max_new_tokens: The maximum length of the output.
Returns:
A list of generated strings.
"""
@abstractclassmethod
def get_loss(self, batch: List[str], batch_target: List[str]) -> List[float]:
"""
Get loss given batch and batch with target.
Use their length difference after tokenization to mask the loss and only compute loss at target tokens.
Args:
batch: batch prompt without target answer.
batch_target: batch prompt with target answer.
Returns:
A list of loss.
"""
def to(self, device):
self.model.to(device)
This diff is collapsed.
This diff is collapsed.
from .conversation import Conversation, get_batch_prompt, prompt_templates
from .utilities import get_json_list, is_rank_0, jdump, jload
__all__ = ["Conversation", "prompt_templates", "get_batch_prompt", "is_rank_0", "jload", "jdump", "get_json_list"]
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment