import json import numpy as np import re import transformers.data.metrics.squad_metrics as squad_metrics from best_download import download_file from scipy.optimize import linear_sum_assignment from lm_eval.base import Task, rf from lm_eval.metrics import mean from pathlib import Path from zipfile import ZipFile class DROP(Task): DATAFOLDER = Path("data/drop") URL = "https://s3-us-west-2.amazonaws.com/allennlp/datasets/drop/drop_dataset.zip" def download(self): if self.DATAFOLDER.exists(): return Path.mkdir(self.DATAFOLDER) download_file(self.URL, to=str(self.DATAFOLDER / "drop_dataset.zip")) with ZipFile(self.DATAFOLDER / "drop_dataset.zip", "r") as zip: zip.extractall(self.DATAFOLDER) def has_training_docs(self): return True def has_validation_docs(self): return True def has_test_docs(self): return False def fewshot_description(self): # TODO: figure out description return "" def _load_docs(self, docs): for doc in docs: for qa in doc["qa_pairs"]: yield { "id": qa["query_id"], "passage": doc["passage"], "question": qa["question"], "answers": self.get_answers(qa["answer"]), } @classmethod def get_answers(cls, answers): # NOTE: We wrap every non-`list` answer into a list for uniformity. if answers["number"] != "": return [str(answers["number"])] if answers["spans"] != []: return answers["spans"] return [" ".join([answers["date"]["day"], answers["date"]["month"], answers["date"]["year"]]).strip()] def training_docs(self): docs = json.load(open(self.DATAFOLDER / "drop_dataset" / "drop_dataset_train.json")) return self._load_docs([docs[k] for k in docs.keys()]) def validation_docs(self): docs = json.load(open(self.DATAFOLDER / "drop_dataset" / "drop_dataset_dev.json")) return self._load_docs([docs[k] for k in docs.keys()]) def test_docs(self): pass def doc_to_text(self, doc): return f"Passage: {doc['passage']}\nQuestion: {doc['question']}\nAnswer:" def doc_to_target(self, doc): return " " + ", ".join(doc["answers"]) def construct_requests(self, doc, ctx): """Uses RequestFactory to construct Requests and returns an iterable of Requests which will be sent to the LM. :param doc: The document as returned from training_docs, validation_docs, or test_docs. :param ctx: str The context string, generated by fewshot_context. This includes the natural language description, as well as the few shot examples, and the question part of the document for `doc`. """ conts = [] for _ in doc["answers"]: conts.append(rf.greedy_until(ctx, ["."])) return conts def process_results(self, doc, results): """Take a single document and the LM results and evaluates, returning a dict where keys are the names of submetrics and values are the values of the metric for that one document :param The document as returned from training_docs, validation_docs, or test_docs. :param results: The results of the requests created in construct_requests. """ golds, preds = doc["answers"], results exact_match = self._exact_match(golds, preds) f1_score = self._f1_score(golds, preds) return { "em": exact_match, "f1": f1_score } def _exact_match(self, golds, preds): """ Returns the exact match of normalized gold answers and predictions. """ normalized_golds = set([self._normalize(gold) for gold in golds]) normalized_preds = set([self._normalize(pred) for pred in preds]) return int(normalized_golds == normalized_preds) def _f1_score(self, golds, preds): """Returns the average F1-score over normalized gold answers and predictions. """ gold_bags = self._answer_to_bags(golds) pred_bags = self._answer_to_bags(preds) f1_per_bag = self._align_bags(gold_bags, pred_bags) return np.mean(f1_per_bag) def _answer_to_bags(self, answers): return [set(self._normalize(answer).split()) for answer in answers] def _align_bags(self, gold_bags, pred_bags): """ Returns the max metric value over all the answers. """ scores = np.zeros([len(gold_bags), len(pred_bags)]) for gold_index, gold_bag in enumerate(gold_bags): for pred_index, pred_bag in enumerate(pred_bags): print(self._is_number_match(gold_bag, pred_bag)) if self._is_number_match(gold_bag, pred_bag): scores[gold_index, pred_index] = self._bag_f1(gold_bag, pred_bag) row_ind, col_ind = linear_sum_assignment(-scores) max_scores = np.zeros([max(len(gold_bags), len(pred_bags))]) for row, column in zip(row_ind, col_ind): max_scores[row] = max(max_scores[row], scores[row, column]) return max_scores def _bag_f1(self, gold_bag, pred_bag): intersection = len(gold_bag.intersection(pred_bag)) if intersection == 0: return 0.0 precision = intersection / float(len(pred_bag)) if pred_bag else 1.0 recall = intersection / float(len(gold_bag)) if gold_bag else 1.0 f1 = (2 * precision * recall) / (precision + recall) return f1 def _is_number_match(self, gold_bag, pred_bag): gold_numbers = set(filter(lambda s: s.isnumeric(), list(gold_bag))) pred_numbers = set(filter(lambda s: s.isnumeric(), list(pred_bag))) return (not gold_numbers) or gold_numbers.intersection(pred_numbers) def _normalize(self, answer): def tokenize(text): return re.split(" |-", text) tokens = [squad_metrics.normalize_answer(token) for token in tokenize(answer)] tokens = [token for token in tokens if token.strip()] normalized = " ".join(tokens).strip() return normalized def aggregation(self): """ :returns: {str: [float] -> float} A dictionary where keys are the names of submetrics and values are A dictionary where keys are the names of submetrics and values are functions that aggregate a list of metrics """ return { "em": mean, "f1": mean } def higher_is_better(self): """ :returns: {str: bool} A dictionary where keys are the names of submetrics and values are A dictionary where keys are the names of submetrics and values are whether a higher value of the submetric is better """ return { "em": True, "f1": True }