Commit 25991f98 authored by hepj's avatar hepj
Browse files

修改readme

parent ac192496
Pipeline #1415 failed with stages
in 0 seconds
git+https://github.com/huggingface/evaluate@main
\ No newline at end of file
test_cases = [
{
"predictions": [0, 0],
"references": [1, 1],
"result": {"metric_score": 0}
},
{
"predictions": [1, 1],
"references": [1, 1],
"result": {"metric_score": 1}
},
{
"predictions": [1, 0],
"references": [1, 1],
"result": {"metric_score": 0.5}
}
]
\ No newline at end of file
# Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TODO: Add a description here."""
import evaluate
import datasets
# TODO: Add BibTeX citation
_CITATION = """\
@InProceedings{huggingface:module,
title = {A great new module},
authors={huggingface, Inc.},
year={2020}
}
"""
# TODO: Add description of the module here
_DESCRIPTION = """\
This new module is designed to solve this great ML task and is crafted with a lot of care.
"""
# TODO: Add description of the arguments of the module here
_KWARGS_DESCRIPTION = """
Calculates how good are predictions given some references, using certain scores
Args:
predictions: list of predictions to score. Each predictions
should be a string with tokens separated by spaces.
references: list of reference for each prediction. Each
reference should be a string with tokens separated by spaces.
Returns:
accuracy: description of the first score,
another_score: description of the second score,
Examples:
Examples should be written in doctest format, and should illustrate how
to use the function.
>>> my_new_module = evaluate.load("my_new_module")
>>> results = my_new_module.compute(references=[0, 1], predictions=[0, 1])
>>> print(results)
{'accuracy': 1.0}
"""
# TODO: Define external resources urls if needed
BAD_WORDS_URL = "http://url/to/external/resource/bad_words.txt"
@evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION)
class {{ cookiecutter.module_class_name }}(evaluate.{{ cookiecutter.module_type | capitalize}}):
"""TODO: Short description of my evaluation module."""
def _info(self):
# TODO: Specifies the evaluate.EvaluationModuleInfo object
return evaluate.{{ cookiecutter.module_type | capitalize}}Info(
# This is the description that will appear on the modules page.
module_type="{{ cookiecutter.module_type}}",
description=_DESCRIPTION,
citation=_CITATION,
inputs_description=_KWARGS_DESCRIPTION,
# This defines the format of each prediction and reference
features=datasets.Features({
'predictions': datasets.Value('int64'),
'references': datasets.Value('int64'),
}),
# Homepage of the module for documentation
homepage="http://module.homepage",
# Additional links to the codebase or references
codebase_urls=["http://github.com/path/to/codebase/of/new_module"],
reference_urls=["http://path.to.reference.url/new_module"]
)
def _download_and_prepare(self, dl_manager):
"""Optional: download external resources useful to compute the scores"""
# TODO: Download external resources if needed
pass
def _compute(self, predictions, references):
"""Returns the scores"""
# TODO: Compute the different scores of the module
accuracy = sum(i == j for i, j in zip(predictions, references)) / len(predictions)
return {
"accuracy": accuracy,
}
\ No newline at end of file
import csv
import json
import lzma
import os
import tarfile
import textwrap
import datasets
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
from datasets import config
from datasets.arrow_dataset import Dataset
from datasets.features import ClassLabel, Features, Sequence, Value
@pytest.fixture(autouse=True)
def set_test_cache_config(tmp_path_factory, monkeypatch):
# test_hf_cache_home = tmp_path_factory.mktemp("cache") # TODO: why a cache dir per test function does not work?
test_hf_cache_home = tmp_path_factory.getbasetemp() / "cache"
test_hf_evaluate_cache = test_hf_cache_home / "datasets"
test_hf_metrics_cache = test_hf_cache_home / "metrics"
test_hf_modules_cache = test_hf_cache_home / "modules"
monkeypatch.setattr("evaluate.config.HF_EVALUATE_CACHE", str(test_hf_evaluate_cache))
monkeypatch.setattr("evaluate.config.HF_METRICS_CACHE", str(test_hf_metrics_cache))
monkeypatch.setattr("evaluate.config.HF_MODULES_CACHE", str(test_hf_modules_cache))
test_DOWNLOADED_EVALUATE_PATH = test_hf_evaluate_cache / "downloads"
monkeypatch.setattr("evaluate.config.DOWNLOADED_EVALUATE_PATH", str(test_DOWNLOADED_EVALUATE_PATH))
test_EXTRACTED_EVALUATE_PATH = test_hf_evaluate_cache / "downloads" / "extracted"
monkeypatch.setattr("evaluate.config.EXTRACTED_EVALUATE_PATH", str(test_EXTRACTED_EVALUATE_PATH))
@pytest.fixture(autouse=True, scope="session")
def disable_tqdm_output():
datasets.disable_progress_bar()
@pytest.fixture(autouse=True)
def set_update_download_counts_to_false(monkeypatch):
# don't take tests into account when counting downloads
monkeypatch.setattr("evaluate.config.HF_UPDATE_DOWNLOAD_COUNTS", False)
monkeypatch.setattr("datasets.config.HF_UPDATE_DOWNLOAD_COUNTS", False)
FILE_CONTENT = """\
Text data.
Second line of data."""
@pytest.fixture(scope="session")
def dataset():
n = 10
features = Features(
{
"tokens": Sequence(Value("string")),
"labels": Sequence(ClassLabel(names=["negative", "positive"])),
"answers": Sequence(
{
"text": Value("string"),
"answer_start": Value("int32"),
}
),
"id": Value("int64"),
}
)
dataset = Dataset.from_dict(
{
"tokens": [["foo"] * 5] * n,
"labels": [[1] * 5] * n,
"answers": [{"answer_start": [97], "text": ["1976"]}] * 10,
"id": list(range(n)),
},
features=features,
)
return dataset
@pytest.fixture(scope="session")
def arrow_file(tmp_path_factory, dataset):
filename = str(tmp_path_factory.mktemp("data") / "file.arrow")
dataset.map(cache_file_name=filename)
return filename
@pytest.fixture(scope="session")
def text_file(tmp_path_factory):
filename = tmp_path_factory.mktemp("data") / "file.txt"
data = FILE_CONTENT
with open(filename, "w") as f:
f.write(data)
return filename
@pytest.fixture(scope="session")
def xz_file(tmp_path_factory):
filename = tmp_path_factory.mktemp("data") / "file.txt.xz"
data = bytes(FILE_CONTENT, "utf-8")
with lzma.open(filename, "wb") as f:
f.write(data)
return filename
@pytest.fixture(scope="session")
def gz_file(tmp_path_factory):
import gzip
path = str(tmp_path_factory.mktemp("data") / "file.txt.gz")
data = bytes(FILE_CONTENT, "utf-8")
with gzip.open(path, "wb") as f:
f.write(data)
return path
@pytest.fixture(scope="session")
def bz2_file(tmp_path_factory):
import bz2
path = tmp_path_factory.mktemp("data") / "file.txt.bz2"
data = bytes(FILE_CONTENT, "utf-8")
with bz2.open(path, "wb") as f:
f.write(data)
return path
@pytest.fixture(scope="session")
def zstd_file(tmp_path_factory):
if config.ZSTANDARD_AVAILABLE:
import zstandard as zstd
path = tmp_path_factory.mktemp("data") / "file.txt.zst"
data = bytes(FILE_CONTENT, "utf-8")
with zstd.open(path, "wb") as f:
f.write(data)
return path
@pytest.fixture(scope="session")
def lz4_file(tmp_path_factory):
if config.LZ4_AVAILABLE:
import lz4.frame
path = tmp_path_factory.mktemp("data") / "file.txt.lz4"
data = bytes(FILE_CONTENT, "utf-8")
with lz4.frame.open(path, "wb") as f:
f.write(data)
return path
@pytest.fixture(scope="session")
def xml_file(tmp_path_factory):
filename = tmp_path_factory.mktemp("data") / "file.xml"
data = textwrap.dedent(
"""\
<?xml version="1.0" encoding="UTF-8" ?>
<tmx version="1.4">
<header segtype="sentence" srclang="ca" />
<body>
<tu>
<tuv xml:lang="ca"><seg>Contingut 1</seg></tuv>
<tuv xml:lang="en"><seg>Content 1</seg></tuv>
</tu>
<tu>
<tuv xml:lang="ca"><seg>Contingut 2</seg></tuv>
<tuv xml:lang="en"><seg>Content 2</seg></tuv>
</tu>
<tu>
<tuv xml:lang="ca"><seg>Contingut 3</seg></tuv>
<tuv xml:lang="en"><seg>Content 3</seg></tuv>
</tu>
<tu>
<tuv xml:lang="ca"><seg>Contingut 4</seg></tuv>
<tuv xml:lang="en"><seg>Content 4</seg></tuv>
</tu>
<tu>
<tuv xml:lang="ca"><seg>Contingut 5</seg></tuv>
<tuv xml:lang="en"><seg>Content 5</seg></tuv>
</tu>
</body>
</tmx>"""
)
with open(filename, "w") as f:
f.write(data)
return filename
DATA = [
{"col_1": "0", "col_2": 0, "col_3": 0.0},
{"col_1": "1", "col_2": 1, "col_3": 1.0},
{"col_1": "2", "col_2": 2, "col_3": 2.0},
{"col_1": "3", "col_2": 3, "col_3": 3.0},
]
DATA2 = [
{"col_1": "4", "col_2": 4, "col_3": 4.0},
{"col_1": "5", "col_2": 5, "col_3": 5.0},
]
DATA_DICT_OF_LISTS = {
"col_1": ["0", "1", "2", "3"],
"col_2": [0, 1, 2, 3],
"col_3": [0.0, 1.0, 2.0, 3.0],
}
DATA_312 = [
{"col_3": 0.0, "col_1": "0", "col_2": 0},
{"col_3": 1.0, "col_1": "1", "col_2": 1},
]
DATA_STR = [
{"col_1": "s0", "col_2": 0, "col_3": 0.0},
{"col_1": "s1", "col_2": 1, "col_3": 1.0},
{"col_1": "s2", "col_2": 2, "col_3": 2.0},
{"col_1": "s3", "col_2": 3, "col_3": 3.0},
]
@pytest.fixture(scope="session")
def dataset_dict():
return DATA_DICT_OF_LISTS
@pytest.fixture(scope="session")
def arrow_path(tmp_path_factory):
dataset = Dataset.from_dict(DATA_DICT_OF_LISTS)
path = str(tmp_path_factory.mktemp("data") / "dataset.arrow")
dataset.map(cache_file_name=path)
return path
@pytest.fixture(scope="session")
def csv_path(tmp_path_factory):
path = str(tmp_path_factory.mktemp("data") / "dataset.csv")
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["col_1", "col_2", "col_3"])
writer.writeheader()
for item in DATA:
writer.writerow(item)
return path
@pytest.fixture(scope="session")
def csv2_path(tmp_path_factory):
path = str(tmp_path_factory.mktemp("data") / "dataset2.csv")
with open(path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["col_1", "col_2", "col_3"])
writer.writeheader()
for item in DATA:
writer.writerow(item)
return path
@pytest.fixture(scope="session")
def bz2_csv_path(csv_path, tmp_path_factory):
import bz2
path = tmp_path_factory.mktemp("data") / "dataset.csv.bz2"
with open(csv_path, "rb") as f:
data = f.read()
# data = bytes(FILE_CONTENT, "utf-8")
with bz2.open(path, "wb") as f:
f.write(data)
return path
@pytest.fixture(scope="session")
def zip_csv_path(csv_path, csv2_path, tmp_path_factory):
import zipfile
path = tmp_path_factory.mktemp("data") / "dataset.csv.zip"
with zipfile.ZipFile(path, "w") as f:
f.write(csv_path, arcname=os.path.basename(csv_path))
f.write(csv2_path, arcname=os.path.basename(csv2_path))
return path
@pytest.fixture(scope="session")
def zip_csv_with_dir_path(csv_path, csv2_path, tmp_path_factory):
import zipfile
path = tmp_path_factory.mktemp("data") / "dataset_with_dir.csv.zip"
with zipfile.ZipFile(path, "w") as f:
f.write(csv_path, arcname=os.path.join("main_dir", os.path.basename(csv_path)))
f.write(csv2_path, arcname=os.path.join("main_dir", os.path.basename(csv2_path)))
return path
@pytest.fixture(scope="session")
def parquet_path(tmp_path_factory):
path = str(tmp_path_factory.mktemp("data") / "dataset.parquet")
schema = pa.schema(
{
"col_1": pa.string(),
"col_2": pa.int64(),
"col_3": pa.float64(),
}
)
with open(path, "wb") as f:
writer = pq.ParquetWriter(f, schema=schema)
pa_table = pa.Table.from_pydict({k: [DATA[i][k] for i in range(len(DATA))] for k in DATA[0]}, schema=schema)
writer.write_table(pa_table)
writer.close()
return path
@pytest.fixture(scope="session")
def json_list_of_dicts_path(tmp_path_factory):
path = str(tmp_path_factory.mktemp("data") / "dataset.json")
data = {"data": DATA}
with open(path, "w") as f:
json.dump(data, f)
return path
@pytest.fixture(scope="session")
def json_dict_of_lists_path(tmp_path_factory):
path = str(tmp_path_factory.mktemp("data") / "dataset.json")
data = {"data": DATA_DICT_OF_LISTS}
with open(path, "w") as f:
json.dump(data, f)
return path
@pytest.fixture(scope="session")
def jsonl_path(tmp_path_factory):
path = str(tmp_path_factory.mktemp("data") / "dataset.jsonl")
with open(path, "w") as f:
for item in DATA:
f.write(json.dumps(item) + "\n")
return path
@pytest.fixture(scope="session")
def jsonl2_path(tmp_path_factory):
path = str(tmp_path_factory.mktemp("data") / "dataset2.jsonl")
with open(path, "w") as f:
for item in DATA:
f.write(json.dumps(item) + "\n")
return path
@pytest.fixture(scope="session")
def jsonl_312_path(tmp_path_factory):
path = str(tmp_path_factory.mktemp("data") / "dataset_312.jsonl")
with open(path, "w") as f:
for item in DATA_312:
f.write(json.dumps(item) + "\n")
return path
@pytest.fixture(scope="session")
def jsonl_str_path(tmp_path_factory):
path = str(tmp_path_factory.mktemp("data") / "dataset-str.jsonl")
with open(path, "w") as f:
for item in DATA_STR:
f.write(json.dumps(item) + "\n")
return path
@pytest.fixture(scope="session")
def text_gz_path(tmp_path_factory, text_path):
import gzip
path = str(tmp_path_factory.mktemp("data") / "dataset.txt.gz")
with open(text_path, "rb") as orig_file:
with gzip.open(path, "wb") as zipped_file:
zipped_file.writelines(orig_file)
return path
@pytest.fixture(scope="session")
def jsonl_gz_path(tmp_path_factory, jsonl_path):
import gzip
path = str(tmp_path_factory.mktemp("data") / "dataset.jsonl.gz")
with open(jsonl_path, "rb") as orig_file:
with gzip.open(path, "wb") as zipped_file:
zipped_file.writelines(orig_file)
return path
@pytest.fixture(scope="session")
def zip_jsonl_path(jsonl_path, jsonl2_path, tmp_path_factory):
import zipfile
path = tmp_path_factory.mktemp("data") / "dataset.jsonl.zip"
with zipfile.ZipFile(path, "w") as f:
f.write(jsonl_path, arcname=os.path.basename(jsonl_path))
f.write(jsonl2_path, arcname=os.path.basename(jsonl2_path))
return path
@pytest.fixture(scope="session")
def zip_jsonl_with_dir_path(jsonl_path, jsonl2_path, tmp_path_factory):
import zipfile
path = tmp_path_factory.mktemp("data") / "dataset_with_dir.jsonl.zip"
with zipfile.ZipFile(path, "w") as f:
f.write(jsonl_path, arcname=os.path.join("main_dir", os.path.basename(jsonl_path)))
f.write(jsonl2_path, arcname=os.path.join("main_dir", os.path.basename(jsonl2_path)))
return path
@pytest.fixture(scope="session")
def tar_jsonl_path(jsonl_path, jsonl2_path, tmp_path_factory):
path = tmp_path_factory.mktemp("data") / "dataset.jsonl.tar"
with tarfile.TarFile(path, "w") as f:
f.add(jsonl_path, arcname=os.path.basename(jsonl_path))
f.add(jsonl2_path, arcname=os.path.basename(jsonl2_path))
return path
@pytest.fixture(scope="session")
def tar_nested_jsonl_path(tar_jsonl_path, jsonl_path, jsonl2_path, tmp_path_factory):
path = tmp_path_factory.mktemp("data") / "dataset_nested.jsonl.tar"
with tarfile.TarFile(path, "w") as f:
f.add(tar_jsonl_path, arcname=os.path.join("nested", os.path.basename(tar_jsonl_path)))
return path
@pytest.fixture(scope="session")
def text_path(tmp_path_factory):
data = ["0", "1", "2", "3"]
path = str(tmp_path_factory.mktemp("data") / "dataset.txt")
with open(path, "w") as f:
for item in data:
f.write(item + "\n")
return path
@pytest.fixture(scope="session")
def text2_path(tmp_path_factory):
data = ["0", "1", "2", "3"]
path = str(tmp_path_factory.mktemp("data") / "dataset2.txt")
with open(path, "w") as f:
for item in data:
f.write(item + "\n")
return path
@pytest.fixture(scope="session")
def zip_text_path(text_path, text2_path, tmp_path_factory):
import zipfile
path = tmp_path_factory.mktemp("data") / "dataset.text.zip"
with zipfile.ZipFile(path, "w") as f:
f.write(text_path, arcname=os.path.basename(text_path))
f.write(text2_path, arcname=os.path.basename(text2_path))
return path
@pytest.fixture(scope="session")
def zip_text_with_dir_path(text_path, text2_path, tmp_path_factory):
import zipfile
path = tmp_path_factory.mktemp("data") / "dataset_with_dir.text.zip"
with zipfile.ZipFile(path, "w") as f:
f.write(text_path, arcname=os.path.join("main_dir", os.path.basename(text_path)))
f.write(text2_path, arcname=os.path.join("main_dir", os.path.basename(text2_path)))
return path
@pytest.fixture(scope="session")
def text_path_with_unicode_new_lines(tmp_path_factory):
text = "\n".join(["First", "Second\u2029with Unicode new line", "Third"])
path = str(tmp_path_factory.mktemp("data") / "dataset_with_unicode_new_lines.txt")
with open(path, "w", encoding="utf-8") as f:
f.write(text)
return path
from unittest import TestCase
from evaluate import EvaluationSuite
from tests.test_evaluator import DummyTextClassificationPipeline
class TestEvaluationSuite(TestCase):
def setUp(self):
# Check that the EvaluationSuite loads successfully
self.evaluation_suite = EvaluationSuite.load("evaluate/evaluation-suite-ci")
# Setup a dummy model for usage with the EvaluationSuite
self.dummy_model = DummyTextClassificationPipeline()
def test_running_evaluation_suite(self):
# Check that the evaluation suite successfully runs
results = self.evaluation_suite.run(self.dummy_model)
# Check that the results are correct
for r in results:
self.assertEqual(r["accuracy"], 0.5)
# Check that correct number of tasks were run
self.assertEqual(len(results), 2)
def test_empty_suite(self):
self.empty_suite = self.evaluation_suite
self.empty_suite.suite = []
self.assertRaises(ValueError, self.empty_suite.run, self.dummy_model)
# Copyright 2022 The HuggingFace Datasets Authors and the TensorFlow Datasets Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
from time import sleep
from unittest import TestCase, mock
import numpy as np
from datasets import ClassLabel, Dataset, Features, Sequence, Value
from PIL import Image
from transformers import (
AutoConfig,
AutoFeatureExtractor,
AutoModelForAudioClassification,
AutoModelForImageClassification,
AutoModelForQuestionAnswering,
AutoModelForSequenceClassification,
AutoModelForTokenClassification,
AutoTokenizer,
pipeline,
)
from evaluate import (
AudioClassificationEvaluator,
AutomaticSpeechRecognitionEvaluator,
Evaluator,
ImageClassificationEvaluator,
QuestionAnsweringEvaluator,
Text2TextGenerationEvaluator,
TextClassificationEvaluator,
TextGenerationEvaluator,
TokenClassificationEvaluator,
evaluator,
load,
)
from .utils import slow
class DummyTextGenerationPipeline:
def __init__(self, prefix="generated", task="text-generation", num_return_sequences=1):
self.task = task
self.prefix = prefix
self.num_return_sequences = num_return_sequences
def __call__(self, inputs, **kwargs):
return [[{f"{self.prefix}_text": "Lorem ipsum"} for _ in range(self.num_return_sequences)] for _ in inputs]
class DummyText2TextGenerationPipeline:
def __init__(self, prefix="generated", task="text2text-generation"):
self.task = task
self.prefix = prefix
def __call__(self, inputs, **kwargs):
return [{f"{self.prefix}_text": "Lorem ipsum"} for _ in inputs]
class DummyTextClassificationPipeline:
def __init__(self, sleep_time=None):
self.task = "text-classification"
self.sleep_time = sleep_time
def __call__(self, inputs, **kwargs):
if self.sleep_time is not None:
sleep(self.sleep_time)
return [{"label": "NEGATIVE"} if i % 2 == 1 else {"label": "POSITIVE"} for i, _ in enumerate(inputs)]
class DummyImageClassificationPipeline:
def __init__(self):
self.task = "image-classification"
def __call__(self, images, **kwargs):
return [[{"score": 0.9, "label": "yurt"}, {"score": 0.1, "label": "umbrella"}] for i, _ in enumerate(images)]
class DummyQuestionAnsweringPipeline:
def __init__(self, v2: bool):
self.task = "question-answering"
self.v2 = v2
def __call__(self, question, context, **kwargs):
if self.v2:
return [
{"score": 0.95, "start": 31, "end": 39, "answer": "Felix"}
if i % 2 == 0
else {"score": 0.95, "start": 0, "end": 0, "answer": ""}
for i in range(len(question))
]
else:
return [{"score": 0.95, "start": 31, "end": 39, "answer": "Felix"} for _ in question]
class DummyTokenClassificationPipeline:
def __init__(self):
self.task = "token-classification"
def __call__(self, inputs, **kwargs):
result = [
{"start": 0, "entity": "B-LOC"},
{"start": 2, "entity": "I-LOC"},
{"start": 4, "entity": "I-LOC"},
{"start": 9, "entity": "O"},
{"start": 11, "entity": "O"},
{"start": 16, "entity": "B-LOC"},
{"start": 21, "entity": "O"},
]
return [result]
class DummyAutomaticSpeechRecognitionPipeline:
def __init__(self) -> None:
self.task = "automatic-speech-recognition"
def __call__(self, inputs, **kwargs):
return [{"text": "Lorem ipsum"} for _ in inputs]
class DummyAudioClassificationPipeline:
def __init__(self):
self.task = "audio-classification"
def __call__(self, audio, **kwargs):
return [[{"score": 0.9, "label": "yes"}, {"score": 0.1, "label": "no"}] for i, _ in enumerate(audio)]
class TestEvaluator(TestCase):
def setUp(self):
self.data = Dataset.from_dict({"label": [1, 0], "text": ["great movie", "horrible movie"]})
self.default_ckpt = "hf-internal-testing/tiny-random-bert"
self.default_model = AutoModelForSequenceClassification.from_pretrained(self.default_ckpt, num_labels=2)
self.default_tokenizer = AutoTokenizer.from_pretrained(self.default_ckpt)
self.pipe = pipeline("text-classification", model=self.default_model, tokenizer=self.default_tokenizer)
self.evaluator = evaluator("text-classification")
self.data = Dataset.from_dict({"label": [1, 0], "text": ["great movie", "horrible movie"]})
self.label_mapping = {"LABEL_0": 0.0, "LABEL_1": 1.0}
def test_wrong_task(self):
self.assertRaises(KeyError, evaluator, "bad_task")
def test_device_placement(self):
orig_import = __import__
pt_mock = mock.Mock()
tf_mock = mock.Mock()
# mock import of torch and tensorflow
def import_pt_tf_mock(name, *args):
if name == "torch":
if pt_available:
return pt_mock
else:
raise ImportError
if name == "tensorflow":
if tf_available:
return tf_mock
else:
raise ImportError
return orig_import(name, *args)
with mock.patch("builtins.__import__", side_effect=import_pt_tf_mock):
# neither pt or tf are available
pt_available = False
tf_available = False
self.assertEqual(Evaluator._infer_device(), -1)
# pt available but no GPU
pt_available = True
pt_mock.cuda.is_available.return_value = False
self.assertEqual(Evaluator._infer_device(), -1)
# pt available and GPU found
pt_mock.cuda.is_available.return_value = True
self.assertEqual(Evaluator._infer_device(), 0)
# tf available but no GPU
pt_available = False
tf_available = True
tf_mock.config.list_physical_devices.return_value = []
self.assertEqual(Evaluator._infer_device(), -1)
# tf available and GPU found
tf_mock.config.list_physical_devices.return_value = ["GPU:0", "GPU:1"]
self.assertEqual(Evaluator._infer_device(), 0)
# pt accelerator found and pipeline instantiated on CPU
pt_mock.cuda.is_available.return_value = True
self.assertRaises(
ValueError, Evaluator.check_for_mismatch_in_device_setup, Evaluator._infer_device(), self.pipe
)
# tf accelerator found and pipeline instantiated on CPU
pt_available = False
tf_available = True
self.assertRaises(
ValueError, Evaluator.check_for_mismatch_in_device_setup, Evaluator._infer_device(), self.pipe
)
def test_pipe_init(self):
self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
input_column="text",
label_column="label",
label_mapping=self.label_mapping,
)
def test_model_init(self):
self.evaluator.compute(
model_or_pipeline=self.default_model,
tokenizer=self.default_tokenizer,
data=self.data,
input_column="text",
label_column="label",
label_mapping=self.label_mapping,
)
def test_model_str_init(self):
self.evaluator.compute(
model_or_pipeline=self.default_ckpt,
data=self.data,
input_column="text",
label_column="label",
label_mapping=self.label_mapping,
)
class TestTextClassificationEvaluator(TestCase):
def setUp(self):
self.data = Dataset.from_dict({"label": [1, 0], "text": ["great movie", "horrible movie"]})
self.default_model = "lvwerra/distilbert-imdb"
self.input_column = "text"
self.label_column = "label"
self.pipe = DummyTextClassificationPipeline()
self.perf_pipe = DummyTextClassificationPipeline(sleep_time=0.1)
self.evaluator = evaluator("text-classification")
self.label_mapping = {"NEGATIVE": 0.0, "POSITIVE": 1.0}
def test_pipe_init(self):
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
input_column="text",
label_column="label",
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 1.0)
@slow
def test_model_init(self):
results = self.evaluator.compute(
model_or_pipeline=self.default_model,
data=self.data,
metric="accuracy",
input_column=self.input_column,
label_column=self.label_column,
label_mapping=self.label_mapping,
)
model = AutoModelForSequenceClassification.from_pretrained(self.default_model)
tokenizer = AutoTokenizer.from_pretrained(self.default_model)
self.assertEqual(results["accuracy"], 1.0)
results = self.evaluator.compute(
model_or_pipeline=model,
data=self.data,
metric="accuracy",
tokenizer=tokenizer,
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 1.0)
def test_class_init(self):
evaluator = TextClassificationEvaluator()
self.assertEqual(evaluator.task, "text-classification")
self.assertIsNone(evaluator.default_metric_name)
results = evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="f1",
label_mapping=self.label_mapping,
)
self.assertEqual(results["f1"], 1.0)
@slow
def test_default_pipe_init(self):
results = self.evaluator.compute(
data=self.data,
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 1.0)
def test_data_loading(self):
# Test passing in dataset by name with split
data = self.evaluator.load_data("evaluate/imdb-ci", split="test[:1]")
self.evaluator.prepare_data(data=data, input_column="text", label_column="label", second_input_column=None)
# Test passing in dataset by name without split and inferring the optimal split
data = self.evaluator.load_data("evaluate/imdb-ci")
self.evaluator.prepare_data(data=data, input_column="text", label_column="label", second_input_column=None)
# Test that it chooses the correct one (e.g. imdb only has train and test, but no validation)
self.assertEqual(data.split, "test")
# Test that the data point returned is correct; this maps to the first example in the dataset
self.assertEqual(data[0]["text"], "I love movies about whales!")
# Test loading subset of a dataset with the `name` field
data = self.evaluator.load_data("evaluate/glue-ci", subset="cola", split="test")
self.assertEqual(isinstance(data, Dataset), True)
# Test loading subset of a dataset with the `name` field and having it infer the split
data = self.evaluator.load_data("evaluate/glue-ci", subset="cola")
self.assertEqual(isinstance(data, Dataset), True)
def test_overwrite_default_metric(self):
accuracy = load("accuracy")
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric=accuracy,
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 1.0)
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="accuracy",
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 1.0)
def test_bootstrap(self):
data = Dataset.from_dict({"label": [1, 0, 0], "text": ["great movie", "great movie", "horrible movie"]})
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=data,
metric="accuracy",
label_mapping=self.label_mapping,
strategy="bootstrap",
n_resamples=10,
random_state=0,
)
self.assertAlmostEqual(results["accuracy"]["score"], 0.666666, 5)
self.assertAlmostEqual(results["accuracy"]["confidence_interval"][0], 0.33557, 5)
self.assertAlmostEqual(results["accuracy"]["confidence_interval"][1], 1.0, 5)
self.assertAlmostEqual(results["accuracy"]["standard_error"], 0.22498, 5)
def test_perf(self):
results = self.evaluator.compute(
model_or_pipeline=self.perf_pipe,
data=self.data,
metric="accuracy",
input_column=self.input_column,
label_column=self.label_column,
label_mapping=self.label_mapping,
n_resamples=10,
random_state=0,
)
self.assertEqual(results["accuracy"], 1.0)
self.assertAlmostEqual(results["total_time_in_seconds"], 0.1, 1)
self.assertAlmostEqual(results["samples_per_second"], len(self.data) / results["total_time_in_seconds"], 5)
self.assertAlmostEqual(results["latency_in_seconds"], results["total_time_in_seconds"] / len(self.data), 5)
def test_bootstrap_and_perf(self):
data = Dataset.from_dict({"label": [1, 0, 0], "text": ["great movie", "great movie", "horrible movie"]})
results = self.evaluator.compute(
model_or_pipeline=self.perf_pipe,
data=data,
metric="accuracy",
input_column=self.input_column,
label_column=self.label_column,
label_mapping=self.label_mapping,
strategy="bootstrap",
n_resamples=10,
random_state=0,
)
self.assertAlmostEqual(results["accuracy"]["score"], 0.666666, 5)
self.assertAlmostEqual(results["accuracy"]["confidence_interval"][0], 0.33557, 5)
self.assertAlmostEqual(results["accuracy"]["confidence_interval"][1], 1.0, 5)
self.assertAlmostEqual(results["accuracy"]["standard_error"], 0.22498285, 5)
self.assertAlmostEqual(results["total_time_in_seconds"], 0.1, 1)
self.assertAlmostEqual(results["samples_per_second"], len(data) / results["total_time_in_seconds"], 5)
self.assertAlmostEqual(results["latency_in_seconds"], results["total_time_in_seconds"] / len(data), 5)
class TestTextClassificationEvaluatorTwoColumns(TestCase):
def setUp(self):
self.data = Dataset.from_dict(
{
"label": [1, 0],
"premise": ["great car", "great movie"],
"hypothesis": ["great vehicle", "horrible movie"],
}
)
self.default_model = "prajjwal1/bert-tiny-mnli"
self.input_column = "premise"
self.second_input_column = "hypothesis"
self.label_column = "label"
self.pipe = DummyTextClassificationPipeline()
self.evaluator = evaluator("text-classification")
self.label_mapping = {"NEGATIVE": 0.0, "POSITIVE": 1.0}
self.label_mapping2 = {"LABEL_0": 0, "LABEL_1": 1, "LABEL_2": 2}
def test_pipe_init(self):
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
input_column=self.input_column,
second_input_column=self.second_input_column,
label_column="label",
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 1.0)
@slow
def test_model_init(self):
results = self.evaluator.compute(
model_or_pipeline=self.default_model,
data=self.data,
metric="accuracy",
input_column=self.input_column,
second_input_column=self.second_input_column,
label_column=self.label_column,
label_mapping=self.label_mapping2,
)
self.assertEqual(results["accuracy"], 1.0)
model = AutoModelForSequenceClassification.from_pretrained(self.default_model)
tokenizer = AutoTokenizer.from_pretrained(self.default_model)
results = self.evaluator.compute(
model_or_pipeline=model,
data=self.data,
metric="accuracy",
input_column=self.input_column,
second_input_column=self.second_input_column,
tokenizer=tokenizer,
label_mapping=self.label_mapping2,
)
self.assertEqual(results["accuracy"], 1.0)
class TestImageClassificationEvaluator(TestCase):
def setUp(self):
self.data = Dataset.from_dict(
{
"label": [2, 2],
"image": [Image.new("RGB", (500, 500), (255, 255, 255)), Image.new("RGB", (500, 500), (170, 95, 170))],
}
)
self.default_model = "lysandre/tiny-vit-random"
self.pipe = DummyImageClassificationPipeline()
self.evaluator = evaluator("image-classification")
self.label_mapping = AutoConfig.from_pretrained(self.default_model).label2id
def test_pipe_init(self):
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
@slow
def test_model_init(self):
results = self.evaluator.compute(
model_or_pipeline=self.default_model,
data=self.data,
metric="accuracy",
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
model = AutoModelForImageClassification.from_pretrained(self.default_model)
feature_extractor = AutoFeatureExtractor.from_pretrained(self.default_model)
results = self.evaluator.compute(
model_or_pipeline=model,
data=self.data,
metric="accuracy",
feature_extractor=feature_extractor,
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
def test_class_init(self):
evaluator = ImageClassificationEvaluator()
self.assertEqual(evaluator.task, "image-classification")
self.assertIsNone(evaluator.default_metric_name)
results = evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="accuracy",
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
@slow
def test_default_pipe_init(self):
results = self.evaluator.compute(
data=self.data,
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
def test_overwrite_default_metric(self):
accuracy = load("accuracy")
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric=accuracy,
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="accuracy",
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
class TestQuestionAnsweringEvaluator(TestCase):
def setUp(self):
self.data = Dataset.from_dict(
{
"id": ["56be4db0acb8001400a502ec", "56be4db0acb8001400a502ed"],
"context": ["My name is Felix and I love cookies!", "Misa name is Felix and misa love cookies!"],
"answers": [{"text": ["Felix"], "answer_start": [11]}, {"text": ["Felix"], "answer_start": [13]}],
"question": ["What is my name?", "What is my name?"],
}
)
self.data_v2 = Dataset.from_dict(
{
"id": ["56be4db0acb8001400a502ec", "56be4db0acb8001400a502ed"],
"context": ["My name is Felix and I love cookies!", "Let's explore the city!"],
"answers": [{"text": ["Felix"], "answer_start": [11]}, {"text": [], "answer_start": []}],
"question": ["What is my name?", "What is my name?"],
}
)
self.default_model = "mrm8488/bert-tiny-finetuned-squadv2"
self.pipe = DummyQuestionAnsweringPipeline(v2=False)
self.pipe_v2 = DummyQuestionAnsweringPipeline(v2=True)
self.evaluator = evaluator("question-answering")
def test_pipe_init(self):
# squad_v1-like dataset
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
)
self.assertEqual(results["exact_match"], 100.0)
self.assertEqual(results["f1"], 100.0)
@slow
def test_model_init(self):
# squad_v1-like dataset
results = self.evaluator.compute(
model_or_pipeline=self.default_model,
data=self.data,
metric="squad",
)
self.assertEqual(results["exact_match"], 0)
self.assertEqual(results["f1"], 100 / 3)
model = AutoModelForQuestionAnswering.from_pretrained(self.default_model)
tokenizer = AutoTokenizer.from_pretrained(self.default_model)
results = self.evaluator.compute(
model_or_pipeline=model,
data=self.data,
metric="squad",
tokenizer=tokenizer,
)
self.assertEqual(results["exact_match"], 0)
self.assertEqual(results["f1"], 100 / 3)
def test_class_init(self):
# squad_v1-like dataset
evaluator = QuestionAnsweringEvaluator()
self.assertEqual(evaluator.task, "question-answering")
self.assertIsNone(evaluator.default_metric_name)
results = evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="squad",
)
self.assertEqual(results["exact_match"], 100.0)
self.assertEqual(results["f1"], 100.0)
# squad_v2-like dataset
evaluator = QuestionAnsweringEvaluator()
self.assertEqual(evaluator.task, "question-answering")
self.assertIsNone(evaluator.default_metric_name)
results = evaluator.compute(
model_or_pipeline=self.pipe_v2,
data=self.data_v2,
metric="squad_v2",
)
self.assertDictEqual(
{key: results[key] for key in ["HasAns_f1", "NoAns_f1"]}, {"HasAns_f1": 100.0, "NoAns_f1": 100.0}
)
@slow
def test_default_pipe_init(self):
# squad_v1-like dataset
results = self.evaluator.compute(
data=self.data,
)
self.assertEqual(results["exact_match"], 100.0)
self.assertEqual(results["f1"], 100.0)
# squad_v2-like dataset
results = self.evaluator.compute(
data=self.data_v2,
metric="squad_v2",
)
self.assertDictEqual(
{key: results[key] for key in ["HasAns_f1", "NoAns_f1"]}, {"HasAns_f1": 100.0, "NoAns_f1": 0.0}
)
def test_data_loading(self):
# Test passing in dataset by name with data_split
data = self.evaluator.load_data("evaluate/squad-ci", split="validation[:1]")
self.evaluator.prepare_data(
data=data, question_column="question", context_column="context", id_column="id", label_column="answers"
)
# Test passing in dataset by name without data_split and inferring the optimal split
data = self.evaluator.load_data("evaluate/squad-ci")
self.evaluator.prepare_data(
data=data, question_column="question", context_column="context", id_column="id", label_column="answers"
)
# Test that it chooses the correct one (e.g. squad only has train and validation, but no test)
self.assertEqual(data.split, "validation")
# Test that the data point returned is correct; this maps to the first example in the squad-ci dataset
self.assertEqual(data[0]["id"], "56be4db0acb8001400a502ec")
def test_overwrite_default_metric(self):
# squad_v1-like dataset
squad = load("squad")
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric=squad,
)
self.assertEqual(results["exact_match"], 100.0)
self.assertEqual(results["f1"], 100.0)
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="squad",
)
self.assertEqual(results["exact_match"], 100.0)
self.assertEqual(results["f1"], 100.0)
class TestTokenClassificationEvaluator(TestCase):
def setUp(self):
features = Features(
{
"tokens": Sequence(feature=Value(dtype="string")),
"ner_tags": Sequence(feature=ClassLabel(names=["O", "B-LOC", "I-LOC"])),
}
)
self.data = Dataset.from_dict(
{
"tokens": [["New", "York", "a", "nice", "City", "."]],
"ner_tags": [[1, 2, 0, 0, 1, 0]],
},
features=features,
)
self.default_model = "hf-internal-testing/tiny-bert-for-token-classification"
self.pipe = DummyTokenClassificationPipeline()
self.evaluator = evaluator("token-classification")
@slow
def test_model_init(self):
results = self.evaluator.compute(
model_or_pipeline=self.default_model,
data=self.data,
metric="seqeval",
)
self.assertEqual(results["overall_accuracy"], 0.5)
model = AutoModelForTokenClassification.from_pretrained(self.default_model)
tokenizer = AutoTokenizer.from_pretrained(self.default_model)
results = self.evaluator.compute(
model_or_pipeline=model,
data=self.data,
metric="seqeval",
tokenizer=tokenizer,
)
self.assertEqual(results["overall_accuracy"], 0.5)
def test_class_init(self):
evaluator = TokenClassificationEvaluator()
self.assertEqual(evaluator.task, "token-classification")
self.assertIsNone(evaluator.default_metric_name)
results = evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="seqeval",
)
self.assertEqual(results["overall_accuracy"], 1.0)
@slow
def test_default_pipe_init(self):
results = self.evaluator.compute(
data=self.data,
)
self.assertEqual(results["overall_accuracy"], 2 / 3)
def test_overwrite_default_metric(self):
accuracy = load("seqeval")
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric=accuracy,
)
self.assertEqual(results["overall_accuracy"], 1.0)
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="seqeval",
)
self.assertEqual(results["overall_accuracy"], 1.0)
def test_data_loading(self):
# Test passing in dataset by name with data_split
data = self.evaluator.load_data("evaluate/conll2003-ci", split="validation[:1]")
self.evaluator.prepare_data(
data=data,
input_column="tokens",
label_column="ner_tags",
join_by=" ",
)
# Test passing in dataset by name without data_split and inferring the optimal split
data = self.evaluator.load_data("evaluate/conll2003-ci")
self.evaluator.prepare_data(
data=data,
input_column="tokens",
label_column="ner_tags",
join_by=" ",
)
# Test that it chooses the correct one (e.g. conll2003 has train, validation, test but should select test)
self.assertEqual(data.split, "test")
# Test that the data point returned is correct; this maps to the first example in the dataset
self.assertEqual(data[0]["id"], "0")
def test_wrong_task(self):
self.assertRaises(KeyError, evaluator, "bad_task")
def test_words_to_offsets(self):
task_evaluator = evaluator("token-classification")
words = ["This", "is", "a", "test", "."]
join_by = " "
offsets = task_evaluator.words_to_offsets(words, join_by)
self.assertListEqual([(0, 3), (5, 6), (8, 8), (10, 13), (15, 15)], offsets)
words = ["日", "本", "語", "はなせるの?"]
join_by = ""
offsets = task_evaluator.words_to_offsets(words, join_by)
self.assertListEqual([(0, 0), (1, 1), (2, 2), (3, 8)], offsets)
def test_predictions_processor(self):
task_evaluator = evaluator("token-classification")
join_by = " "
words = [["New", "York", "a", "nice", "City", "."]]
# aligned start and words
predictions = [
[
{"start": 0, "entity": "B-LOC"},
{"start": 2, "entity": "I-LOC"},
{"start": 4, "entity": "I-LOC"},
{"start": 9, "entity": "O"},
{"start": 11, "entity": "O"},
{"start": 16, "entity": "B-LOC"},
{"start": 21, "entity": "O"},
]
]
predictions = task_evaluator.predictions_processor(predictions, words, join_by)
self.assertListEqual(predictions["predictions"][0], ["B-LOC", "I-LOC", "O", "O", "B-LOC", "O"])
# non-aligned start and words
predictions = [
[
{"start": 0, "entity": "B-LOC"},
{"start": 2, "entity": "I-LOC"},
{"start": 9, "entity": "O"},
{"start": 11, "entity": "O"},
{"start": 16, "entity": "B-LOC"},
{"start": 21, "entity": "O"},
]
]
predictions = task_evaluator.predictions_processor(predictions, words, join_by)
self.assertListEqual(predictions["predictions"][0], ["B-LOC", "O", "O", "O", "B-LOC", "O"])
# non-aligned start and words
predictions = [
[
{"start": 0, "entity": "B-LOC"},
{"start": 6, "entity": "I-LOC"},
{"start": 9, "entity": "O"},
{"start": 11, "entity": "O"},
{"start": 16, "entity": "B-LOC"},
{"start": 21, "entity": "O"},
]
]
predictions = task_evaluator.predictions_processor(predictions, words, join_by)
self.assertListEqual(predictions["predictions"][0], ["B-LOC", "O", "O", "O", "B-LOC", "O"])
# non-aligned start and words
predictions = [
[
{"start": 0, "entity": "B-LOC"},
{"start": 9, "entity": "O"},
{"start": 11, "entity": "O"},
{"start": 16, "entity": "B-LOC"},
{"start": 21, "entity": "O"},
]
]
predictions = task_evaluator.predictions_processor(predictions, words, join_by)
self.assertListEqual(predictions["predictions"][0], ["B-LOC", "O", "O", "O", "B-LOC", "O"])
class TestTextGenerationEvaluator(TestCase):
def setUp(self):
self.data = Dataset.from_dict({"text": ["Lorem ipsum"]})
self.pipe = DummyTextGenerationPipeline(num_return_sequences=4)
self.evaluator = evaluator("text-generation")
def test_class_init(self):
evaluator = TextGenerationEvaluator()
self.assertEqual(evaluator.task, "text-generation")
self.assertIsNone(evaluator.default_metric_name)
results = evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="word_count",
)
self.assertIsInstance(results["unique_words"], int)
@slow
def test_default_pipe_init(self):
results = self.evaluator.compute(data=self.data)
self.assertIsInstance(results["unique_words"], int)
def test_overwrite_default_metric(self):
word_length = load("word_length")
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric=word_length,
)
self.assertIsInstance(results["average_word_length"], int)
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="word_length",
)
self.assertIsInstance(results["average_word_length"], int)
def test_process_predictions_multiple_return_sequences(self):
processed_predictions = self.evaluator.predictions_processor(
[
[{"generated_text": "A"}, {"generated_text": "B"}],
[{"generated_text": "C"}, {"generated_text": "D"}],
]
)
self.assertEqual(processed_predictions, {"data": ["A", "B", "C", "D"]})
class TestText2TextGenerationEvaluator(TestCase):
def setUp(self):
self.data = Dataset.from_dict(
{
"text": ["Lorem ipsum"] * 4,
"label": ["Ipsum Lorem"] * 4,
}
)
self.pipe = DummyText2TextGenerationPipeline()
self.evaluator = evaluator("text2text-generation")
def test_pipe_init(self):
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
)
self.assertEqual(results["bleu"], 0)
def test_class_init(self):
evaluator = Text2TextGenerationEvaluator()
self.assertEqual(evaluator.task, "text2text-generation")
self.assertIsNone(evaluator.default_metric_name)
results = evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="bleu",
)
self.assertEqual(results["bleu"], 0)
@slow
def test_default_pipe_init(self):
results = self.evaluator.compute(data=self.data)
self.assertEqual(results["bleu"], 0)
def test_overwrite_default_metric(self):
rouge = load("rouge")
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric=rouge,
)
self.assertEqual(results["rouge1"], 1.0)
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="rouge",
)
self.assertEqual(results["rouge1"], 1.0)
def test_summarization(self):
pipe = DummyText2TextGenerationPipeline(task="summarization", prefix="summary")
e = evaluator("summarization")
results = e.compute(
model_or_pipeline=pipe,
data=self.data,
)
self.assertEqual(results["rouge1"], 1.0)
def test_translation(self):
pipe = DummyText2TextGenerationPipeline(task="translation", prefix="translation")
e = evaluator("translation")
results = e.compute(
model_or_pipeline=pipe,
data=self.data,
)
self.assertEqual(results["bleu"], 0)
class TestAutomaticSpeechRecognitionEvaluator(TestCase):
def setUp(self):
self.data = Dataset.from_dict(
{
"path": [
# Examples copied from default speech model of
# `automic-speech-recognition` pipeline:
# https://huggingface.co/facebook/wav2vec2-base-960h
# https://github.com/huggingface/transformers/blob/main/src/transformers/pipelines/__init__.py#L161
"https://cdn-media.huggingface.co/speech_samples/sample1.flac",
"https://cdn-media.huggingface.co/speech_samples/sample2.flac",
],
"sentence": ["Ipsum Lorem"] * 2,
}
)
self.pipe = DummyAutomaticSpeechRecognitionPipeline()
self.evaluator = evaluator("automatic-speech-recognition")
def test_pipe_init(self):
print(self.evaluator)
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
)
print(results)
self.assertEqual(results["wer"], 1.0)
def test_class_init(self):
evaluator = AutomaticSpeechRecognitionEvaluator()
self.assertEqual(evaluator.task, "automatic-speech-recognition")
self.assertIsNone(evaluator.default_metric_name)
results = evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="wer",
)
self.assertEqual(results["wer"], 1.0)
@slow
def test_default_pipe_init(self):
results = self.evaluator.compute(data=self.data)
self.assertGreater(results["wer"], 1.0)
def test_overwrite_default_metric(self):
cer = load("cer")
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric=cer,
)
self.assertEqual(results["cer"], 0.7272727272727273)
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="cer",
)
self.assertEqual(results["cer"], 0.7272727272727273)
class TestAudioClassificationEvaluator(TestCase):
def setUp(self):
self.data = Dataset.from_dict(
{"file": ["https://huggingface.co/datasets/Narsil/asr_dummy/resolve/main/1.flac"], "label": [11]}
)
self.raw_data = Dataset.from_dict(
{
"audio": [
np.array(
[-0.00048828, -0.00018311, -0.00137329, 0.00079346, 0.00091553, 0.00085449], dtype=np.float32
)
],
"label": [11],
}
)
self.default_model = "superb/wav2vec2-base-superb-ks"
self.pipe = DummyAudioClassificationPipeline()
self.evaluator = evaluator("audio-classification")
self.label_mapping = AutoConfig.from_pretrained(self.default_model).label2id
def test_pipe_init(self):
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
def test_raw_pipe_init(self):
results = self.evaluator.compute(
model_or_pipeline=self.pipe, data=self.raw_data, label_mapping=self.label_mapping, input_column="audio"
)
self.assertEqual(results["accuracy"], 0)
@slow
def test_model_init(self):
results = self.evaluator.compute(
model_or_pipeline=self.default_model,
data=self.data,
metric="accuracy",
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
model = AutoModelForAudioClassification.from_pretrained(self.default_model)
feature_extractor = AutoFeatureExtractor.from_pretrained(self.default_model)
results = self.evaluator.compute(
model_or_pipeline=model,
data=self.data,
metric="accuracy",
feature_extractor=feature_extractor,
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
def test_class_init(self):
evaluator = AudioClassificationEvaluator()
self.assertEqual(evaluator.task, "audio-classification")
self.assertIsNone(evaluator.default_metric_name)
results = evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="accuracy",
label_mapping=self.label_mapping,
)
results_raw = evaluator.compute(
model_or_pipeline=self.pipe,
data=self.raw_data,
label_mapping=self.label_mapping,
metric="accuracy",
input_column="audio",
)
self.assertEqual(results_raw["accuracy"], 0)
self.assertEqual(results["accuracy"], 0)
@slow
def test_default_pipe_init(self):
results = self.evaluator.compute(
data=self.data,
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
def test_overwrite_default_metric(self):
accuracy = load("accuracy")
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric=accuracy,
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
results = self.evaluator.compute(
model_or_pipeline=self.pipe,
data=self.data,
metric="accuracy",
label_mapping=self.label_mapping,
)
self.assertEqual(results["accuracy"], 0)
import os
from pathlib import Path
from unittest.mock import patch
import pytest
from evaluate.utils.file_utils import OfflineModeIsEnabled, cached_path, ftp_get, ftp_head, http_get, http_head
FILE_CONTENT = """\
Text data.
Second line of data."""
def test_cached_path_local(text_file):
# absolute path
text_file = str(Path(text_file).resolve())
assert cached_path(text_file) == text_file
# relative path
text_file = str(Path(__file__).resolve().relative_to(Path(os.getcwd())))
assert cached_path(text_file) == text_file
def test_cached_path_missing_local(tmp_path):
# absolute path
missing_file = str(tmp_path.resolve() / "__missing_file__.txt")
with pytest.raises(FileNotFoundError):
cached_path(missing_file)
# relative path
missing_file = "./__missing_file__.txt"
with pytest.raises(FileNotFoundError):
cached_path(missing_file)
@patch("evaluate.config.HF_EVALUATE_OFFLINE", True)
def test_cached_path_offline():
with pytest.raises(OfflineModeIsEnabled):
cached_path("https://huggingface.co")
@patch("evaluate.config.HF_EVALUATE_OFFLINE", True)
def test_http_offline(tmp_path_factory):
filename = tmp_path_factory.mktemp("data") / "file.html"
with pytest.raises(OfflineModeIsEnabled):
http_get("https://huggingface.co", temp_file=filename)
with pytest.raises(OfflineModeIsEnabled):
http_head("https://huggingface.co")
@patch("evaluate.config.HF_EVALUATE_OFFLINE", True)
def test_ftp_offline(tmp_path_factory):
filename = tmp_path_factory.mktemp("data") / "file.html"
with pytest.raises(OfflineModeIsEnabled):
ftp_get("ftp://huggingface.co", temp_file=filename)
with pytest.raises(OfflineModeIsEnabled):
ftp_head("ftp://huggingface.co")
import glob
from unittest import TestCase
from unittest.mock import patch
import pytest
import requests
import yaml
from evaluate.hub import push_to_hub
from tests.test_metric import DummyMetric
minimum_metadata = {
"model-index": [
{
"results": [
{
"task": {"type": "dummy-task"},
"dataset": {"type": "dataset_type", "name": "dataset_name"},
"metrics": [
{"type": "dummy_metric", "value": 1.0, "name": "Pretty Metric Name"},
],
}
]
}
]
}
extras_metadata = {
"model-index": [
{
"results": [
{
"task": {"type": "dummy-task", "name": "task_name"},
"dataset": {
"type": "dataset_type",
"name": "dataset_name",
"config": "fr",
"split": "test",
"revision": "abc",
"args": {"a": 1, "b": 2},
},
"metrics": [
{
"type": "dummy_metric",
"value": 1.0,
"name": "Pretty Metric Name",
"config": "default",
"args": {"hello": 1, "world": 2},
},
],
}
]
}
]
}
@patch("evaluate.hub.HF_HUB_ALLOWED_TASKS", ["dummy-task"])
@patch("evaluate.hub.dataset_info", lambda x: True)
@patch("evaluate.hub.model_info", lambda x: True)
@patch("evaluate.hub.metadata_update")
class TestHub(TestCase):
@pytest.fixture(autouse=True)
def inject_fixtures(self, caplog):
self._caplog = caplog
def setUp(self):
self.metric = DummyMetric()
self.metric.add()
self.args = {"hello": 1, "world": 2}
self.result = self.metric.compute()
def test_push_metric_required_arguments(self, metadata_update):
push_to_hub(
model_id="username/repo",
metric_value=self.result["accuracy"],
metric_name="Pretty Metric Name",
metric_type=self.metric.name,
dataset_name="dataset_name",
dataset_type="dataset_type",
task_type="dummy-task",
)
metadata_update.assert_called_once_with(repo_id="username/repo", metadata=minimum_metadata, overwrite=False)
def test_push_metric_missing_arguments(self, metadata_update):
with pytest.raises(TypeError):
push_to_hub(
model_id="username/repo",
metric_value=self.result["accuracy"],
metric_name="Pretty Metric Name",
metric_type=self.metric.name,
dataset_name="dataset_name",
dataset_type="dummy-task",
)
def test_push_metric_invalid_arguments(self, metadata_update):
with pytest.raises(TypeError):
push_to_hub(
model_id="username/repo",
metric_value=self.result["accuracy"],
metric_name="Pretty Metric Name",
metric_type=self.metric.name,
dataset_name="dataset_name",
dataset_type="dataset_type",
task_type="dummy-task",
random_value="incorrect",
)
def test_push_metric_extra_arguments(self, metadata_update):
push_to_hub(
model_id="username/repo",
metric_value=self.result["accuracy"],
metric_name="Pretty Metric Name",
metric_type=self.metric.name,
dataset_name="dataset_name",
dataset_type="dataset_type",
dataset_config="fr",
dataset_split="test",
dataset_revision="abc",
dataset_args={"a": 1, "b": 2},
task_type="dummy-task",
task_name="task_name",
metric_config=self.metric.config_name,
metric_args=self.args,
)
metadata_update.assert_called_once_with(repo_id="username/repo", metadata=extras_metadata, overwrite=False)
def test_push_metric_invalid_task_type(self, metadata_update):
with pytest.raises(ValueError):
push_to_hub(
model_id="username/repo",
metric_value=self.result["accuracy"],
metric_name="Pretty Metric Name",
metric_type=self.metric.name,
dataset_name="dataset_name",
dataset_type="dataset_type",
task_type="audio-classification",
)
def test_push_metric_invalid_dataset_type(self, metadata_update):
with patch("evaluate.hub.dataset_info") as mock_dataset_info:
mock_dataset_info.side_effect = requests.HTTPError()
push_to_hub(
model_id="username/repo",
metric_value=self.result["accuracy"],
metric_name="Pretty Metric Name",
metric_type=self.metric.name,
dataset_name="dataset_name",
dataset_type="dataset_type",
task_type="dummy-task",
)
assert "Dataset dataset_type not found on the Hub at hf.co/datasets/dataset_type" in self._caplog.text
metadata_update.assert_called_once_with(
repo_id="username/repo", metadata=minimum_metadata, overwrite=False
)
def test_push_metric_invalid_model_id(self, metadata_update):
with patch("evaluate.hub.model_info") as mock_model_info:
mock_model_info.side_effect = requests.HTTPError()
with pytest.raises(ValueError):
push_to_hub(
model_id="username/bad-repo",
metric_value=self.result["accuracy"],
metric_name="Pretty Metric Name",
metric_type=self.metric.name,
dataset_name="dataset_name",
dataset_type="dataset_type",
task_type="dummy-task",
)
class ValidateYaml(TestCase):
def setUp(self):
pass
def testLoadingCards(self):
readme_filepaths = []
for glob_path in ["measurements/*/README.md", "metrics/*/README.md", "comparisons/*/README.md"]:
readme_filepaths.extend(glob.glob(glob_path))
for readme_file in readme_filepaths:
with open(readme_file, encoding="utf8") as f_yaml:
x = yaml.safe_load_all(f_yaml)
self.assertIsInstance(next(x), dict)
import importlib
import os
import tempfile
from unittest import TestCase
import pytest
from datasets import DownloadConfig
import evaluate
from evaluate.loading import (
CachedEvaluationModuleFactory,
HubEvaluationModuleFactory,
LocalEvaluationModuleFactory,
evaluation_module_factory,
)
from .utils import OfflineSimulationMode, offline
SAMPLE_METRIC_IDENTIFIER = "lvwerra/test"
METRIC_LOADING_SCRIPT_NAME = "__dummy_metric1__"
METRIC_LOADING_SCRIPT_CODE = """
import evaluate
from evaluate import EvaluationModuleInfo
from datasets import Features, Value
class __DummyMetric1__(evaluate.EvaluationModule):
def _info(self):
return EvaluationModuleInfo(features=Features({"predictions": Value("int"), "references": Value("int")}))
def _compute(self, predictions, references):
return {"__dummy_metric1__": sum(int(p == r) for p, r in zip(predictions, references))}
"""
@pytest.fixture
def metric_loading_script_dir(tmp_path):
script_name = METRIC_LOADING_SCRIPT_NAME
script_dir = tmp_path / script_name
script_dir.mkdir()
script_path = script_dir / f"{script_name}.py"
with open(script_path, "w") as f:
f.write(METRIC_LOADING_SCRIPT_CODE)
return str(script_dir)
class ModuleFactoryTest(TestCase):
@pytest.fixture(autouse=True)
def inject_fixtures(self, metric_loading_script_dir):
self._metric_loading_script_dir = metric_loading_script_dir
def setUp(self):
self.hf_modules_cache = tempfile.mkdtemp()
self.cache_dir = tempfile.mkdtemp()
self.download_config = DownloadConfig(cache_dir=self.cache_dir)
self.dynamic_modules_path = evaluate.loading.init_dynamic_modules(
name="test_datasets_modules_" + os.path.basename(self.hf_modules_cache),
hf_modules_cache=self.hf_modules_cache,
)
def test_HubEvaluationModuleFactory_with_internal_import(self):
# "squad_v2" requires additional imports (internal)
factory = HubEvaluationModuleFactory(
"evaluate-metric/squad_v2",
module_type="metric",
download_config=self.download_config,
dynamic_modules_path=self.dynamic_modules_path,
)
module_factory_result = factory.get_module()
assert importlib.import_module(module_factory_result.module_path) is not None
def test_HubEvaluationModuleFactory_with_external_import(self):
# "bleu" requires additional imports (external from github)
factory = HubEvaluationModuleFactory(
"evaluate-metric/bleu",
module_type="metric",
download_config=self.download_config,
dynamic_modules_path=self.dynamic_modules_path,
)
module_factory_result = factory.get_module()
assert importlib.import_module(module_factory_result.module_path) is not None
def test_HubEvaluationModuleFactoryWithScript(self):
factory = HubEvaluationModuleFactory(
SAMPLE_METRIC_IDENTIFIER,
download_config=self.download_config,
dynamic_modules_path=self.dynamic_modules_path,
)
module_factory_result = factory.get_module()
assert importlib.import_module(module_factory_result.module_path) is not None
def test_LocalMetricModuleFactory(self):
path = os.path.join(self._metric_loading_script_dir, f"{METRIC_LOADING_SCRIPT_NAME}.py")
factory = LocalEvaluationModuleFactory(
path, download_config=self.download_config, dynamic_modules_path=self.dynamic_modules_path
)
module_factory_result = factory.get_module()
assert importlib.import_module(module_factory_result.module_path) is not None
def test_CachedMetricModuleFactory(self):
path = os.path.join(self._metric_loading_script_dir, f"{METRIC_LOADING_SCRIPT_NAME}.py")
factory = LocalEvaluationModuleFactory(
path, download_config=self.download_config, dynamic_modules_path=self.dynamic_modules_path
)
module_factory_result = factory.get_module()
for offline_mode in OfflineSimulationMode:
with offline(offline_mode):
factory = CachedEvaluationModuleFactory(
METRIC_LOADING_SCRIPT_NAME,
dynamic_modules_path=self.dynamic_modules_path,
)
module_factory_result = factory.get_module()
assert importlib.import_module(module_factory_result.module_path) is not None
def test_cache_with_remote_canonical_module(self):
metric = "accuracy"
evaluation_module_factory(
metric, download_config=self.download_config, dynamic_modules_path=self.dynamic_modules_path
)
for offline_mode in OfflineSimulationMode:
with offline(offline_mode):
evaluation_module_factory(
metric, download_config=self.download_config, dynamic_modules_path=self.dynamic_modules_path
)
def test_cache_with_remote_community_module(self):
metric = "lvwerra/test"
evaluation_module_factory(
metric, download_config=self.download_config, dynamic_modules_path=self.dynamic_modules_path
)
for offline_mode in OfflineSimulationMode:
with offline(offline_mode):
evaluation_module_factory(
metric, download_config=self.download_config, dynamic_modules_path=self.dynamic_modules_path
)
import os
import pickle
import tempfile
import time
from multiprocessing import Pool
from unittest import TestCase, mock
import pytest
from datasets.features import Features, Sequence, Value
from evaluate.module import EvaluationModule, EvaluationModuleInfo, combine
from .utils import require_tf, require_torch
class DummyMetric(EvaluationModule):
def _info(self):
return EvaluationModuleInfo(
description="dummy metric for tests",
citation="insert citation here",
features=Features({"predictions": Value("int64"), "references": Value("int64")}),
)
def _compute(self, predictions, references):
result = {}
if not predictions:
return result
else:
result["accuracy"] = sum(i == j for i, j in zip(predictions, references)) / len(predictions)
try:
result["set_equality"] = set(predictions) == set(references)
except TypeError:
result["set_equality"] = None
return result
@classmethod
def predictions_and_references(cls):
return ([1, 2, 3, 4], [1, 2, 4, 3])
@classmethod
def predictions_and_references_strings(cls):
return (["a", "b", "c", "d"], ["a", "b", "d", "c"])
@classmethod
def expected_results(cls):
return {"accuracy": 0.5, "set_equality": True}
@classmethod
def other_predictions_and_references(cls):
return ([1, 3, 4, 5], [1, 2, 3, 4])
@classmethod
def other_expected_results(cls):
return {"accuracy": 0.25, "set_equality": False}
@classmethod
def distributed_predictions_and_references(cls):
return ([1, 2, 3, 4], [1, 2, 3, 4]), ([1, 2, 4, 5], [1, 2, 3, 4])
@classmethod
def distributed_expected_results(cls):
return {"accuracy": 0.75, "set_equality": False}
@classmethod
def separate_predictions_and_references(cls):
return ([1, 2, 3, 4], [1, 2, 3, 4]), ([1, 2, 4, 5], [1, 2, 3, 4])
@classmethod
def separate_expected_results(cls):
return [{"accuracy": 1.0, "set_equality": True}, {"accuracy": 0.5, "set_equality": False}]
class AnotherDummyMetric(EvaluationModule):
def _info(self):
return EvaluationModuleInfo(
description="another dummy metric for tests",
citation="insert citation here",
features=Features({"predictions": Value("int64"), "references": Value("int64")}),
)
def _compute(self, predictions, references):
return {"set_equality": False}
@classmethod
def expected_results(cls):
return {"set_equality": False}
def properly_del_metric(metric):
"""properly delete a metric on windows if the process is killed during multiprocessing"""
if metric is not None:
if metric.filelock is not None:
metric.filelock.release()
if metric.rendez_vous_lock is not None:
metric.rendez_vous_lock.release()
del metric.writer
del metric.data
del metric
def metric_compute(arg):
"""Thread worker function for distributed evaluation testing.
On base level to be pickable.
"""
metric = None
try:
num_process, process_id, preds, refs, exp_id, cache_dir, wait = arg
metric = DummyMetric(
num_process=num_process, process_id=process_id, experiment_id=exp_id, cache_dir=cache_dir, timeout=5
)
time.sleep(wait)
results = metric.compute(predictions=preds, references=refs)
return results
finally:
properly_del_metric(metric)
def metric_add_batch_and_compute(arg):
"""Thread worker function for distributed evaluation testing.
On base level to be pickable.
"""
metric = None
try:
num_process, process_id, preds, refs, exp_id, cache_dir, wait = arg
metric = DummyMetric(
num_process=num_process, process_id=process_id, experiment_id=exp_id, cache_dir=cache_dir, timeout=5
)
metric.add_batch(predictions=preds, references=refs)
time.sleep(wait)
results = metric.compute()
return results
finally:
properly_del_metric(metric)
def metric_add_and_compute(arg):
"""Thread worker function for distributed evaluation testing.
On base level to be pickable.
"""
metric = None
try:
num_process, process_id, preds, refs, exp_id, cache_dir, wait = arg
metric = DummyMetric(
num_process=num_process, process_id=process_id, experiment_id=exp_id, cache_dir=cache_dir, timeout=5
)
for pred, ref in zip(preds, refs):
metric.add(prediction=pred, reference=ref)
time.sleep(wait)
results = metric.compute()
return results
finally:
properly_del_metric(metric)
class TestMetric(TestCase):
def test_dummy_metric(self):
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
metric = DummyMetric(experiment_id="test_dummy_metric")
self.assertDictEqual(expected_results, metric.compute(predictions=preds, references=refs))
del metric
metric = DummyMetric(experiment_id="test_dummy_metric")
metric.add_batch(predictions=preds, references=refs)
self.assertDictEqual(expected_results, metric.compute())
del metric
metric = DummyMetric(experiment_id="test_dummy_metric")
for pred, ref in zip(preds, refs):
metric.add(prediction=pred, reference=ref)
self.assertDictEqual(expected_results, metric.compute())
del metric
# With keep_in_memory
metric = DummyMetric(keep_in_memory=True, experiment_id="test_dummy_metric")
self.assertDictEqual(expected_results, metric.compute(predictions=preds, references=refs))
del metric
metric = DummyMetric(keep_in_memory=True, experiment_id="test_dummy_metric")
metric.add_batch(predictions=preds, references=refs)
self.assertDictEqual(expected_results, metric.compute())
del metric
metric = DummyMetric(keep_in_memory=True, experiment_id="test_dummy_metric")
for pred, ref in zip(preds, refs):
metric.add(prediction=pred, reference=ref)
self.assertDictEqual(expected_results, metric.compute())
del metric
metric = DummyMetric(keep_in_memory=True, experiment_id="test_dummy_metric")
self.assertDictEqual({}, metric.compute(predictions=[], references=[]))
del metric
metric = DummyMetric(keep_in_memory=True, experiment_id="test_dummy_metric")
with self.assertRaisesRegex(ValueError, "Mismatch in the number"):
metric.add_batch(predictions=[1, 2, 3], references=[1, 2, 3, 4])
del metric
def test_metric_with_cache_dir(self):
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
with tempfile.TemporaryDirectory() as tmp_dir:
metric = DummyMetric(experiment_id="test_dummy_metric", cache_dir=tmp_dir)
self.assertDictEqual(expected_results, metric.compute(predictions=preds, references=refs))
del metric
def test_concurrent_metrics(self):
preds, refs = DummyMetric.predictions_and_references()
other_preds, other_refs = DummyMetric.other_predictions_and_references()
expected_results = DummyMetric.expected_results()
other_expected_results = DummyMetric.other_expected_results()
metric = DummyMetric(experiment_id="test_concurrent_metrics")
other_metric = DummyMetric(
experiment_id="test_concurrent_metrics",
)
self.assertDictEqual(expected_results, metric.compute(predictions=preds, references=refs))
self.assertDictEqual(
other_expected_results, other_metric.compute(predictions=other_preds, references=other_refs)
)
del metric, other_metric
metric = DummyMetric(
experiment_id="test_concurrent_metrics",
)
other_metric = DummyMetric(
experiment_id="test_concurrent_metrics",
)
metric.add_batch(predictions=preds, references=refs)
other_metric.add_batch(predictions=other_preds, references=other_refs)
self.assertDictEqual(expected_results, metric.compute())
self.assertDictEqual(other_expected_results, other_metric.compute())
for pred, ref, other_pred, other_ref in zip(preds, refs, other_preds, other_refs):
metric.add(prediction=pred, reference=ref)
other_metric.add(prediction=other_pred, reference=other_ref)
self.assertDictEqual(expected_results, metric.compute())
self.assertDictEqual(other_expected_results, other_metric.compute())
del metric, other_metric
# With keep_in_memory
metric = DummyMetric(experiment_id="test_concurrent_metrics", keep_in_memory=True)
other_metric = DummyMetric(experiment_id="test_concurrent_metrics", keep_in_memory=True)
self.assertDictEqual(expected_results, metric.compute(predictions=preds, references=refs))
self.assertDictEqual(
other_expected_results, other_metric.compute(predictions=other_preds, references=other_refs)
)
metric = DummyMetric(experiment_id="test_concurrent_metrics", keep_in_memory=True)
other_metric = DummyMetric(experiment_id="test_concurrent_metrics", keep_in_memory=True)
metric.add_batch(predictions=preds, references=refs)
other_metric.add_batch(predictions=other_preds, references=other_refs)
self.assertDictEqual(expected_results, metric.compute())
self.assertDictEqual(other_expected_results, other_metric.compute())
for pred, ref, other_pred, other_ref in zip(preds, refs, other_preds, other_refs):
metric.add(prediction=pred, reference=ref)
other_metric.add(prediction=other_pred, reference=other_ref)
self.assertDictEqual(expected_results, metric.compute())
self.assertDictEqual(other_expected_results, other_metric.compute())
del metric, other_metric
def test_separate_experiments_in_parallel(self):
with tempfile.TemporaryDirectory() as tmp_dir:
(preds_0, refs_0), (preds_1, refs_1) = DummyMetric.separate_predictions_and_references()
expected_results = DummyMetric.separate_expected_results()
pool = Pool(processes=2)
results = pool.map(
metric_compute,
[
(1, 0, preds_0, refs_0, None, tmp_dir, 0),
(1, 0, preds_1, refs_1, None, tmp_dir, 0),
],
)
self.assertDictEqual(expected_results[0], results[0])
self.assertDictEqual(expected_results[1], results[1])
del results
# more than one sec of waiting so that the second metric has to sample a new hashing name
results = pool.map(
metric_compute,
[
(1, 0, preds_0, refs_0, None, tmp_dir, 2),
(1, 0, preds_1, refs_1, None, tmp_dir, 2),
],
)
self.assertDictEqual(expected_results[0], results[0])
self.assertDictEqual(expected_results[1], results[1])
del results
results = pool.map(
metric_add_and_compute,
[
(1, 0, preds_0, refs_0, None, tmp_dir, 0),
(1, 0, preds_1, refs_1, None, tmp_dir, 0),
],
)
self.assertDictEqual(expected_results[0], results[0])
self.assertDictEqual(expected_results[1], results[1])
del results
results = pool.map(
metric_add_batch_and_compute,
[
(1, 0, preds_0, refs_0, None, tmp_dir, 0),
(1, 0, preds_1, refs_1, None, tmp_dir, 0),
],
)
self.assertDictEqual(expected_results[0], results[0])
self.assertDictEqual(expected_results[1], results[1])
del results
def test_distributed_metrics(self):
with tempfile.TemporaryDirectory() as tmp_dir:
(preds_0, refs_0), (preds_1, refs_1) = DummyMetric.distributed_predictions_and_references()
expected_results = DummyMetric.distributed_expected_results()
pool = Pool(processes=4)
results = pool.map(
metric_compute,
[
(2, 0, preds_0, refs_0, "test_distributed_metrics_0", tmp_dir, 0),
(2, 1, preds_1, refs_1, "test_distributed_metrics_0", tmp_dir, 0.5),
],
)
self.assertDictEqual(expected_results, results[0])
self.assertIsNone(results[1])
del results
results = pool.map(
metric_compute,
[
(2, 0, preds_0, refs_0, "test_distributed_metrics_0", tmp_dir, 0.5),
(2, 1, preds_1, refs_1, "test_distributed_metrics_0", tmp_dir, 0),
],
)
self.assertDictEqual(expected_results, results[0])
self.assertIsNone(results[1])
del results
results = pool.map(
metric_add_and_compute,
[
(2, 0, preds_0, refs_0, "test_distributed_metrics_1", tmp_dir, 0),
(2, 1, preds_1, refs_1, "test_distributed_metrics_1", tmp_dir, 0),
],
)
self.assertDictEqual(expected_results, results[0])
self.assertIsNone(results[1])
del results
results = pool.map(
metric_add_batch_and_compute,
[
(2, 0, preds_0, refs_0, "test_distributed_metrics_2", tmp_dir, 0),
(2, 1, preds_1, refs_1, "test_distributed_metrics_2", tmp_dir, 0),
],
)
self.assertDictEqual(expected_results, results[0])
self.assertIsNone(results[1])
del results
# To use several distributed metrics on the same local file system, need to specify an experiment_id
try:
results = pool.map(
metric_add_and_compute,
[
(2, 0, preds_0, refs_0, "test_distributed_metrics_3", tmp_dir, 0),
(2, 1, preds_1, refs_1, "test_distributed_metrics_3", tmp_dir, 0),
(2, 0, preds_0, refs_0, "test_distributed_metrics_3", tmp_dir, 0),
(2, 1, preds_1, refs_1, "test_distributed_metrics_3", tmp_dir, 0),
],
)
except ValueError:
# We are fine with either raising a ValueError or computing well the metric
# Being sure we raise the error would means making the dummy dataset bigger
# and the test longer...
pass
else:
self.assertDictEqual(expected_results, results[0])
self.assertDictEqual(expected_results, results[2])
self.assertIsNone(results[1])
self.assertIsNone(results[3])
del results
results = pool.map(
metric_add_and_compute,
[
(2, 0, preds_0, refs_0, "exp_0", tmp_dir, 0),
(2, 1, preds_1, refs_1, "exp_0", tmp_dir, 0),
(2, 0, preds_0, refs_0, "exp_1", tmp_dir, 0),
(2, 1, preds_1, refs_1, "exp_1", tmp_dir, 0),
],
)
self.assertDictEqual(expected_results, results[0])
self.assertDictEqual(expected_results, results[2])
self.assertIsNone(results[1])
self.assertIsNone(results[3])
del results
# With keep_in_memory is not allowed
with self.assertRaises(ValueError):
DummyMetric(
experiment_id="test_distributed_metrics_4",
keep_in_memory=True,
num_process=2,
process_id=0,
cache_dir=tmp_dir,
)
def test_dummy_metric_pickle(self):
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file = os.path.join(tmp_dir, "metric.pt")
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
metric = DummyMetric(experiment_id="test_dummy_metric_pickle")
with open(tmp_file, "wb") as f:
pickle.dump(metric, f)
del metric
with open(tmp_file, "rb") as f:
metric = pickle.load(f)
self.assertDictEqual(expected_results, metric.compute(predictions=preds, references=refs))
del metric
def test_input_numpy(self):
import numpy as np
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
preds, refs = np.array(preds), np.array(refs)
metric = DummyMetric(experiment_id="test_input_numpy")
self.assertDictEqual(expected_results, metric.compute(predictions=preds, references=refs))
del metric
metric = DummyMetric(experiment_id="test_input_numpy")
metric.add_batch(predictions=preds, references=refs)
self.assertDictEqual(expected_results, metric.compute())
del metric
metric = DummyMetric(experiment_id="test_input_numpy")
for pred, ref in zip(preds, refs):
metric.add(prediction=pred, reference=ref)
self.assertDictEqual(expected_results, metric.compute())
del metric
@require_torch
def test_input_torch(self):
import torch
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
preds, refs = torch.tensor(preds), torch.tensor(refs)
metric = DummyMetric(experiment_id="test_input_torch")
self.assertDictEqual(expected_results, metric.compute(predictions=preds, references=refs))
del metric
metric = DummyMetric(experiment_id="test_input_torch")
metric.add_batch(predictions=preds, references=refs)
self.assertDictEqual(expected_results, metric.compute())
del metric
metric = DummyMetric(experiment_id="test_input_torch")
for pred, ref in zip(preds, refs):
metric.add(prediction=pred, reference=ref)
self.assertDictEqual(expected_results, metric.compute())
del metric
@require_tf
def test_input_tf(self):
import tensorflow as tf
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
preds, refs = tf.constant(preds), tf.constant(refs)
metric = DummyMetric(experiment_id="test_input_tf")
self.assertDictEqual(expected_results, metric.compute(predictions=preds, references=refs))
del metric
metric = DummyMetric(experiment_id="test_input_tf")
metric.add_batch(predictions=preds, references=refs)
self.assertDictEqual(expected_results, metric.compute())
del metric
metric = DummyMetric(experiment_id="test_input_tf")
for pred, ref in zip(preds, refs):
metric.add(prediction=pred, reference=ref)
self.assertDictEqual(expected_results, metric.compute())
del metric
def test_string_casting(self):
metric = DummyMetric(experiment_id="test_string_casting")
metric.info.features = Features({"predictions": Value("string"), "references": Value("string")})
metric.compute(predictions=["a"], references=["a"])
with self.assertRaises(ValueError):
metric.compute(predictions=[1], references=[1])
metric = DummyMetric(experiment_id="test_string_casting_2")
metric.info.features = Features(
{"predictions": Sequence(Value("string")), "references": Sequence(Value("string"))}
)
metric.compute(predictions=[["a"]], references=[["a"]])
with self.assertRaises(ValueError):
metric.compute(predictions=["a"], references=["a"])
def test_string_casting_tested_once(self):
self.counter = 0
def checked_fct(fct): # wrapper function that increases a counter on each call
def wrapped(*args, **kwargs):
self.counter += 1
return fct(*args, **kwargs)
return wrapped
with mock.patch(
"evaluate.EvaluationModule._enforce_nested_string_type",
checked_fct(DummyMetric._enforce_nested_string_type),
):
metric = DummyMetric(experiment_id="test_string_casting_called_once")
metric.info.features = Features(
{"references": Sequence(Value("string")), "predictions": Sequence(Value("string"))}
)
refs = [["test"] * 10] * 10
preds = [["test"] * 10] * 10
metric.add_batch(references=refs, predictions=preds)
metric.add_batch(references=refs, predictions=preds)
# the function is called twice for every batch's input: once on the
# sequence and then recursively agin on the first input of the sequence
self.assertEqual(self.counter, 8)
def test_multiple_features(self):
metric = DummyMetric()
metric.info.features = [
Features({"predictions": Value("int64"), "references": Value("int64")}),
Features({"predictions": Value("string"), "references": Value("string")}),
]
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
self.assertDictEqual(expected_results, metric.compute(predictions=preds, references=refs))
metric.info.features = [
Features({"predictions": Value("string"), "references": Value("string")}),
Features({"predictions": Value("int64"), "references": Value("int64")}),
]
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
self.assertDictEqual(expected_results, metric.compute(predictions=preds, references=refs))
del metric
class MetricWithMultiLabel(EvaluationModule):
def _info(self):
return EvaluationModuleInfo(
description="dummy metric for tests",
citation="insert citation here",
features=Features(
{"predictions": Sequence(Value("int64")), "references": Sequence(Value("int64"))}
if self.config_name == "multilabel"
else {"predictions": Value("int64"), "references": Value("int64")}
),
)
def _compute(self, predictions=None, references=None):
return (
{
"accuracy": sum(i == j for i, j in zip(predictions, references)) / len(predictions),
}
if predictions
else {}
)
@pytest.mark.parametrize(
"config_name, predictions, references, expected",
[
(None, [1, 2, 3, 4], [1, 2, 4, 3], 0.5), # Multiclass: Value("int64")
(
"multilabel",
[[1, 0], [1, 0], [1, 0], [1, 0]],
[[1, 0], [0, 1], [1, 1], [0, 0]],
0.25,
), # Multilabel: Sequence(Value("int64"))
],
)
def test_metric_with_multilabel(config_name, predictions, references, expected, tmp_path):
cache_dir = tmp_path / "cache"
metric = MetricWithMultiLabel(config_name, cache_dir=cache_dir)
results = metric.compute(predictions=predictions, references=references)
assert results["accuracy"] == expected
def test_safety_checks_process_vars():
with pytest.raises(ValueError):
_ = DummyMetric(process_id=-2)
with pytest.raises(ValueError):
_ = DummyMetric(num_process=2, process_id=3)
class AccuracyWithNonStandardFeatureNames(EvaluationModule):
def _info(self):
return EvaluationModuleInfo(
description="dummy metric for tests",
citation="insert citation here",
features=Features({"inputs": Value("int64"), "targets": Value("int64")}),
)
def _compute(self, inputs, targets):
return (
{
"accuracy": sum(i == j for i, j in zip(inputs, targets)) / len(targets),
}
if targets
else {}
)
@classmethod
def inputs_and_targets(cls):
return ([1, 2, 3, 4], [1, 2, 4, 3])
@classmethod
def expected_results(cls):
return {"accuracy": 0.5}
def test_metric_with_non_standard_feature_names_add(tmp_path):
cache_dir = tmp_path / "cache"
inputs, targets = AccuracyWithNonStandardFeatureNames.inputs_and_targets()
metric = AccuracyWithNonStandardFeatureNames(cache_dir=cache_dir)
for input, target in zip(inputs, targets):
metric.add(inputs=input, targets=target)
results = metric.compute()
assert results == AccuracyWithNonStandardFeatureNames.expected_results()
def test_metric_with_non_standard_feature_names_add_batch(tmp_path):
cache_dir = tmp_path / "cache"
inputs, targets = AccuracyWithNonStandardFeatureNames.inputs_and_targets()
metric = AccuracyWithNonStandardFeatureNames(cache_dir=cache_dir)
metric.add_batch(inputs=inputs, targets=targets)
results = metric.compute()
assert results == AccuracyWithNonStandardFeatureNames.expected_results()
def test_metric_with_non_standard_feature_names_compute(tmp_path):
cache_dir = tmp_path / "cache"
inputs, targets = AccuracyWithNonStandardFeatureNames.inputs_and_targets()
metric = AccuracyWithNonStandardFeatureNames(cache_dir=cache_dir)
results = metric.compute(inputs=inputs, targets=targets)
assert results == AccuracyWithNonStandardFeatureNames.expected_results()
class TestEvaluationcombined_evaluation(TestCase):
def test_single_module(self):
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
combined_evaluation = combine([DummyMetric()])
self.assertDictEqual(expected_results, combined_evaluation.compute(predictions=preds, references=refs))
def test_add(self):
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
combined_evaluation = combine([DummyMetric()])
for pred, ref in zip(preds, refs):
combined_evaluation.add(pred, ref)
self.assertDictEqual(expected_results, combined_evaluation.compute())
def test_add_batch(self):
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
combined_evaluation = combine([DummyMetric()])
combined_evaluation.add_batch(predictions=preds, references=refs)
self.assertDictEqual(expected_results, combined_evaluation.compute())
def test_force_prefix_with_dict(self):
prefix = "test_prefix"
preds, refs = DummyMetric.predictions_and_references()
expected_results = DummyMetric.expected_results()
expected_results[f"{prefix}_accuracy"] = expected_results.pop("accuracy")
expected_results[f"{prefix}_set_equality"] = expected_results.pop("set_equality")
combined_evaluation = combine({prefix: DummyMetric()}, force_prefix=True)
self.assertDictEqual(expected_results, combined_evaluation.compute(predictions=preds, references=refs))
def test_duplicate_module(self):
preds, refs = DummyMetric.predictions_and_references()
dummy_metric = DummyMetric()
dummy_result = DummyMetric.expected_results()
combined_evaluation = combine([dummy_metric, dummy_metric])
expected_results = {}
for i in range(2):
for k in dummy_result:
expected_results[f"{dummy_metric.name}_{i}_{k}"] = dummy_result[k]
self.assertDictEqual(expected_results, combined_evaluation.compute(predictions=preds, references=refs))
def test_two_modules_with_same_score_name(self):
preds, refs = DummyMetric.predictions_and_references()
dummy_metric = DummyMetric()
another_dummy_metric = AnotherDummyMetric()
dummy_result_1 = DummyMetric.expected_results()
dummy_result_2 = AnotherDummyMetric.expected_results()
dummy_result_1[dummy_metric.name + "_set_equality"] = dummy_result_1.pop("set_equality")
dummy_result_1[another_dummy_metric.name + "_set_equality"] = dummy_result_2["set_equality"]
combined_evaluation = combine([dummy_metric, another_dummy_metric])
self.assertDictEqual(dummy_result_1, combined_evaluation.compute(predictions=preds, references=refs))
def test_modules_from_string(self):
expected_result = {"accuracy": 0.5, "recall": 0.5, "precision": 1.0}
predictions = [0, 1]
references = [1, 1]
combined_evaluation = combine(["accuracy", "recall", "precision"])
self.assertDictEqual(
expected_result, combined_evaluation.compute(predictions=predictions, references=references)
)
def test_modules_from_string_poslabel(self):
expected_result = {"recall": 1.0, "precision": 0.5}
predictions = [0, 1, 0]
references = [1, 1, 0]
combined_evaluation = combine(["recall", "precision"])
self.assertDictEqual(
expected_result, combined_evaluation.compute(predictions=predictions, references=references, pos_label=0)
)
# Copyright 2020 HuggingFace Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import doctest
import glob
import importlib
import inspect
import os
import re
from contextlib import contextmanager
from functools import wraps
from unittest.mock import patch
import numpy as np
import pytest
from absl.testing import parameterized
import evaluate
from evaluate import load
from .utils import _run_slow_tests, for_all_test_methods, local, slow
REQUIRE_FAIRSEQ = {"comet"}
_has_fairseq = importlib.util.find_spec("fairseq") is not None
UNSUPPORTED_ON_WINDOWS = {"code_eval"}
_on_windows = os.name == "nt"
SLOW_METRIC = {"perplexity", "regard", "toxicity"}
def skip_if_metric_requires_fairseq(test_case):
@wraps(test_case)
def wrapper(self, evaluation_module_name, evaluation_module_type):
if not _has_fairseq and evaluation_module_name in REQUIRE_FAIRSEQ:
self.skipTest('"test requires Fairseq"')
else:
test_case(self, evaluation_module_name, evaluation_module_type)
return wrapper
def skip_on_windows_if_not_windows_compatible(test_case):
@wraps(test_case)
def wrapper(self, evaluation_module_name, evaluation_module_type):
if _on_windows and evaluation_module_name in UNSUPPORTED_ON_WINDOWS:
self.skipTest('"test not supported on Windows"')
else:
test_case(self, evaluation_module_name, evaluation_module_type)
return wrapper
def skip_slow_metrics(test_case):
@wraps(test_case)
def wrapper(self, evaluation_module_name, evaluation_module_type):
if not _run_slow_tests and evaluation_module_name in SLOW_METRIC:
self.skipTest('"test is slow"')
else:
test_case(self, evaluation_module_name, evaluation_module_type)
return wrapper
def get_local_module_names():
metrics = [metric_dir.split(os.sep)[-2] for metric_dir in glob.glob("./metrics/*/")]
comparisons = [metric_dir.split(os.sep)[-2] for metric_dir in glob.glob("./comparisons/*/")]
measurements = [metric_dir.split(os.sep)[-2] for metric_dir in glob.glob("./measurements/*/")]
evaluation_modules = metrics + comparisons + measurements
evaluation_module_types = (
["metric"] * len(metrics) + ["comparison"] * len(comparisons) + ["measurement"] * len(measurements)
)
return [
{"testcase_name": f"{t}_{x}", "evaluation_module_name": x, "evaluation_module_type": t}
for x, t in zip(evaluation_modules, evaluation_module_types)
if x != "gleu" # gleu is unfinished
]
@parameterized.named_parameters(get_local_module_names())
@for_all_test_methods(skip_if_metric_requires_fairseq, skip_on_windows_if_not_windows_compatible, skip_slow_metrics)
@local
class LocalModuleTest(parameterized.TestCase):
INTENSIVE_CALLS_PATCHER = {}
evaluation_module_name = None
evaluation_module_type = None
def test_load(self, evaluation_module_name, evaluation_module_type):
doctest.ELLIPSIS_MARKER = "[...]"
evaluation_module = importlib.import_module(
evaluate.loading.evaluation_module_factory(
os.path.join(evaluation_module_type + "s", evaluation_module_name), module_type=evaluation_module_type
).module_path
)
evaluation_instance = evaluate.loading.import_main_class(evaluation_module.__name__)
# check parameters
parameters = inspect.signature(evaluation_instance._compute).parameters
self.assertTrue(all([p.kind != p.VAR_KEYWORD for p in parameters.values()])) # no **kwargs
# run doctest
with self.patch_intensive_calls(evaluation_module_name, evaluation_module.__name__):
with self.use_local_metrics(evaluation_module_type):
try:
results = doctest.testmod(evaluation_module, verbose=True, raise_on_error=True)
except doctest.UnexpectedException as e:
raise e.exc_info[1] # raise the exception that doctest caught
self.assertEqual(results.failed, 0)
self.assertGreater(results.attempted, 1)
@slow
def test_load_real_metric(self, evaluation_module_name, evaluation_module_type):
doctest.ELLIPSIS_MARKER = "[...]"
metric_module = importlib.import_module(
evaluate.loading.evaluation_module_factory(
os.path.join(evaluation_module_type, evaluation_module_name)
).module_path
)
# run doctest
with self.use_local_metrics():
results = doctest.testmod(metric_module, verbose=True, raise_on_error=True)
self.assertEqual(results.failed, 0)
self.assertGreater(results.attempted, 1)
@contextmanager
def patch_intensive_calls(self, evaluation_module_name, module_name):
if evaluation_module_name in self.INTENSIVE_CALLS_PATCHER:
with self.INTENSIVE_CALLS_PATCHER[evaluation_module_name](module_name):
yield
else:
yield
@contextmanager
def use_local_metrics(self, evaluation_module_type):
def load_local_metric(evaluation_module_name, *args, **kwargs):
return load(os.path.join(evaluation_module_type + "s", evaluation_module_name), *args, **kwargs)
with patch("evaluate.load") as mock_load:
mock_load.side_effect = load_local_metric
yield
@classmethod
def register_intensive_calls_patcher(cls, evaluation_module_name):
def wrapper(patcher):
patcher = contextmanager(patcher)
cls.INTENSIVE_CALLS_PATCHER[evaluation_module_name] = patcher
return patcher
return wrapper
# Metrics intensive calls patchers
# --------------------------------
@LocalModuleTest.register_intensive_calls_patcher("bleurt")
def patch_bleurt(module_name):
import tensorflow.compat.v1 as tf
from bleurt.score import Predictor
tf.flags.DEFINE_string("sv", "", "") # handle pytest cli flags
class MockedPredictor(Predictor):
def predict(self, input_dict):
assert len(input_dict["input_ids"]) == 2
return np.array([1.03, 1.04])
# mock predict_fn which is supposed to do a forward pass with a bleurt model
with patch("bleurt.score._create_predictor") as mock_create_predictor:
mock_create_predictor.return_value = MockedPredictor()
yield
@LocalModuleTest.register_intensive_calls_patcher("bertscore")
def patch_bertscore(module_name):
import torch
def bert_cos_score_idf(model, refs, *args, **kwargs):
return torch.tensor([[1.0, 1.0, 1.0]] * len(refs))
# mock get_model which is supposed to do download a bert model
# mock bert_cos_score_idf which is supposed to do a forward pass with a bert model
with patch("bert_score.scorer.get_model"), patch(
"bert_score.scorer.bert_cos_score_idf"
) as mock_bert_cos_score_idf:
mock_bert_cos_score_idf.side_effect = bert_cos_score_idf
yield
@LocalModuleTest.register_intensive_calls_patcher("comet")
def patch_comet(module_name):
def load_from_checkpoint(model_path):
class Model:
def predict(self, data, *args, **kwargs):
assert len(data) == 2
scores = [0.19, 0.92]
return scores, sum(scores) / len(scores)
return Model()
# mock load_from_checkpoint which is supposed to do download a bert model
# mock load_from_checkpoint which is supposed to do download a bert model
with patch("comet.download_model") as mock_download_model:
mock_download_model.return_value = None
with patch("comet.load_from_checkpoint") as mock_load_from_checkpoint:
mock_load_from_checkpoint.side_effect = load_from_checkpoint
yield
def test_seqeval_raises_when_incorrect_scheme():
metric = load(os.path.join("metrics", "seqeval"))
wrong_scheme = "ERROR"
error_message = f"Scheme should be one of [IOB1, IOB2, IOE1, IOE2, IOBES, BILOU], got {wrong_scheme}"
with pytest.raises(ValueError, match=re.escape(error_message)):
metric.compute(predictions=[], references=[], scheme=wrong_scheme)
import json
import shutil
import tempfile
from pathlib import Path
from unittest import TestCase
import evaluate
result_dict = {"metric": 1.0, "model_name": "x"}
SAVE_EXTRA_KEYS = ["_timestamp", "_git_commit_hash", "_evaluate_version", "_python_version", "_interpreter_path"]
class TestSave(TestCase):
def setUp(self):
self.save_path = Path(tempfile.mkdtemp())
def tearDown(self):
shutil.rmtree(self.save_path)
def test_save_to_folder(self):
file_path = evaluate.save(self.save_path, **result_dict)
with open(file_path, "r") as f:
loaded_result_dict = json.load(f)
for key in SAVE_EXTRA_KEYS:
_ = loaded_result_dict.pop(key)
self.assertDictEqual(result_dict, loaded_result_dict)
def test_save_to_folder_nested(self):
file_path = evaluate.save(self.save_path / "sub_dir1/sub_dir2", **result_dict)
with open(file_path, "r") as f:
loaded_result_dict = json.load(f)
for key in SAVE_EXTRA_KEYS:
_ = loaded_result_dict.pop(key)
self.assertDictEqual(result_dict, loaded_result_dict)
def test_save_to_file(self):
_ = evaluate.save(self.save_path / "test.json", **result_dict)
with open(self.save_path / "test.json", "r") as f:
loaded_result_dict = json.load(f)
for key in SAVE_EXTRA_KEYS:
_ = loaded_result_dict.pop(key)
self.assertDictEqual(result_dict, loaded_result_dict)
import json
import os
import shutil
import subprocess
import tempfile
import unittest
import numpy as np
import torch
import transformers
from datasets import load_dataset
from transformers import AutoFeatureExtractor, AutoModelForImageClassification, Trainer, TrainingArguments, pipeline
from evaluate import evaluator, load
from .utils import slow
class TestEvaluatorTrainerParity(unittest.TestCase):
def setUp(self):
self.dir_path = tempfile.mkdtemp("evaluator_trainer_parity_test")
transformers_version = transformers.__version__
branch = ""
if not transformers_version.endswith(".dev0"):
branch = f"--branch v{transformers_version}"
subprocess.run(
f"git clone --depth 3 --filter=blob:none --sparse {branch} https://github.com/huggingface/transformers",
shell=True,
cwd=self.dir_path,
)
def tearDown(self):
shutil.rmtree(self.dir_path, ignore_errors=True)
def test_text_classification_parity(self):
model_name = "philschmid/tiny-bert-sst2-distilled"
subprocess.run(
"git sparse-checkout set examples/pytorch/text-classification",
shell=True,
cwd=os.path.join(self.dir_path, "transformers"),
)
subprocess.run(
f"python examples/pytorch/text-classification/run_glue.py"
f" --model_name_or_path {model_name}"
f" --task_name sst2"
f" --do_eval"
f" --max_seq_length 9999999999" # rely on tokenizer.model_max_length for max_length
f" --output_dir {os.path.join(self.dir_path, 'textclassification_sst2_transformers')}"
f" --max_eval_samples 80",
shell=True,
cwd=os.path.join(self.dir_path, "transformers"),
)
with open(
f"{os.path.join(self.dir_path, 'textclassification_sst2_transformers', 'eval_results.json')}", "r"
) as f:
transformers_results = json.load(f)
eval_dataset = load_dataset("glue", "sst2", split="validation[:80]")
pipe = pipeline(task="text-classification", model=model_name, tokenizer=model_name)
task_evaluator = evaluator(task="text-classification")
evaluator_results = task_evaluator.compute(
model_or_pipeline=pipe,
data=eval_dataset,
metric="accuracy",
input_column="sentence",
label_column="label",
label_mapping={"negative": 0, "positive": 1},
strategy="simple",
)
self.assertEqual(transformers_results["eval_accuracy"], evaluator_results["accuracy"])
@slow
def test_text_classification_parity_two_columns(self):
model_name = "prajjwal1/bert-tiny-mnli"
max_eval_samples = 150
subprocess.run(
"git sparse-checkout set examples/pytorch/text-classification",
shell=True,
cwd=os.path.join(self.dir_path, "transformers"),
)
subprocess.run(
f"python examples/pytorch/text-classification/run_glue.py"
f" --model_name_or_path {model_name}"
f" --task_name mnli"
f" --do_eval"
f" --max_seq_length 256"
f" --output_dir {os.path.join(self.dir_path, 'textclassification_mnli_transformers')}"
f" --max_eval_samples {max_eval_samples}",
shell=True,
cwd=os.path.join(self.dir_path, "transformers"),
)
with open(
f"{os.path.join(self.dir_path, 'textclassification_mnli_transformers', 'eval_results.json')}", "r"
) as f:
transformers_results = json.load(f)
eval_dataset = load_dataset("glue", "mnli", split=f"validation_matched[:{max_eval_samples}]")
pipe = pipeline(task="text-classification", model=model_name, tokenizer=model_name, max_length=256)
task_evaluator = evaluator(task="text-classification")
evaluator_results = task_evaluator.compute(
model_or_pipeline=pipe,
data=eval_dataset,
metric="accuracy",
input_column="premise",
second_input_column="hypothesis",
label_column="label",
label_mapping={"LABEL_0": 0, "LABEL_1": 1, "LABEL_2": 2},
)
self.assertEqual(transformers_results["eval_accuracy"], evaluator_results["accuracy"])
def test_image_classification_parity(self):
# we can not compare to the Pytorch transformers example, that uses custom preprocessing on the images
model_name = "douwekiela/resnet-18-finetuned-dogfood"
dataset_name = "beans"
max_eval_samples = 120
raw_dataset = load_dataset(dataset_name, split="validation")
eval_dataset = raw_dataset.select(range(max_eval_samples))
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name)
model = AutoModelForImageClassification.from_pretrained(model_name)
def collate_fn(examples):
pixel_values = torch.stack(
[torch.tensor(feature_extractor(example["image"])["pixel_values"][0]) for example in examples]
)
labels = torch.tensor([example["labels"] for example in examples])
return {"pixel_values": pixel_values, "labels": labels}
metric = load("accuracy")
trainer = Trainer(
model=model,
args=TrainingArguments(
output_dir=os.path.join(self.dir_path, "imageclassification_beans_transformers"),
remove_unused_columns=False,
),
train_dataset=None,
eval_dataset=eval_dataset,
compute_metrics=lambda p: metric.compute(
predictions=np.argmax(p.predictions, axis=1), references=p.label_ids
),
tokenizer=None,
data_collator=collate_fn,
)
metrics = trainer.evaluate()
trainer.save_metrics("eval", metrics)
with open(
f"{os.path.join(self.dir_path, 'imageclassification_beans_transformers', 'eval_results.json')}", "r"
) as f:
transformers_results = json.load(f)
pipe = pipeline(task="image-classification", model=model_name, feature_extractor=model_name)
task_evaluator = evaluator(task="image-classification")
evaluator_results = task_evaluator.compute(
model_or_pipeline=pipe,
data=eval_dataset,
metric="accuracy",
input_column="image",
label_column="labels",
label_mapping=model.config.label2id,
strategy="simple",
)
self.assertEqual(transformers_results["eval_accuracy"], evaluator_results["accuracy"])
def test_question_answering_parity(self):
model_name_v1 = "anas-awadalla/bert-tiny-finetuned-squad"
model_name_v2 = "mrm8488/bert-tiny-finetuned-squadv2"
subprocess.run(
"git sparse-checkout set examples/pytorch/question-answering",
shell=True,
cwd=os.path.join(self.dir_path, "transformers"),
)
# test squad_v1-like dataset
subprocess.run(
f"python examples/pytorch/question-answering/run_qa.py"
f" --model_name_or_path {model_name_v1}"
f" --dataset_name squad"
f" --do_eval"
f" --output_dir {os.path.join(self.dir_path, 'questionanswering_squad_transformers')}"
f" --max_eval_samples 100"
f" --max_seq_length 384",
shell=True,
cwd=os.path.join(self.dir_path, "transformers"),
)
with open(
f"{os.path.join(self.dir_path, 'questionanswering_squad_transformers', 'eval_results.json')}", "r"
) as f:
transformers_results = json.load(f)
eval_dataset = load_dataset("squad", split="validation[:100]")
pipe = pipeline(
task="question-answering",
model=model_name_v1,
tokenizer=model_name_v1,
max_answer_len=30,
padding="max_length",
)
task_evaluator = evaluator(task="question-answering")
evaluator_results = task_evaluator.compute(
model_or_pipeline=pipe,
data=eval_dataset,
metric="squad",
strategy="simple",
)
self.assertEqual(transformers_results["eval_f1"], evaluator_results["f1"])
self.assertEqual(transformers_results["eval_exact_match"], evaluator_results["exact_match"])
# test squad_v2-like dataset
subprocess.run(
f"python examples/pytorch/question-answering/run_qa.py"
f" --model_name_or_path {model_name_v2}"
f" --dataset_name squad_v2"
f" --version_2_with_negative"
f" --do_eval"
f" --output_dir {os.path.join(self.dir_path, 'questionanswering_squadv2_transformers')}"
f" --max_eval_samples 100"
f" --max_seq_length 384",
shell=True,
cwd=os.path.join(self.dir_path, "transformers"),
)
with open(
f"{os.path.join(self.dir_path, 'questionanswering_squadv2_transformers', 'eval_results.json')}", "r"
) as f:
transformers_results = json.load(f)
eval_dataset = load_dataset("squad_v2", split="validation[:100]")
pipe = pipeline(
task="question-answering",
model=model_name_v2,
tokenizer=model_name_v2,
max_answer_len=30,
)
task_evaluator = evaluator(task="question-answering")
evaluator_results = task_evaluator.compute(
model_or_pipeline=pipe,
data=eval_dataset,
metric="squad_v2",
strategy="simple",
squad_v2_format=True,
)
self.assertEqual(transformers_results["eval_f1"], evaluator_results["f1"])
self.assertEqual(transformers_results["eval_HasAns_f1"], evaluator_results["HasAns_f1"])
self.assertEqual(transformers_results["eval_NoAns_f1"], evaluator_results["NoAns_f1"])
def test_token_classification_parity(self):
model_name = "hf-internal-testing/tiny-bert-for-token-classification"
n_samples = 500
subprocess.run(
"git sparse-checkout set examples/pytorch/token-classification",
shell=True,
cwd=os.path.join(self.dir_path, "transformers"),
)
subprocess.run(
f"python examples/pytorch/token-classification/run_ner.py"
f" --model_name_or_path {model_name}"
f" --dataset_name conll2003"
f" --do_eval"
f" --output_dir {os.path.join(self.dir_path, 'tokenclassification_conll2003_transformers')}"
f" --max_eval_samples {n_samples}",
shell=True,
cwd=os.path.join(self.dir_path, "transformers"),
)
with open(
os.path.join(self.dir_path, "tokenclassification_conll2003_transformers", "eval_results.json"), "r"
) as f:
transformers_results = json.load(f)
eval_dataset = load_dataset("conll2003", split=f"validation[:{n_samples}]")
pipe = pipeline(task="token-classification", model=model_name)
e = evaluator(task="token-classification")
evaluator_results = e.compute(
model_or_pipeline=pipe,
data=eval_dataset,
metric="seqeval",
input_column="tokens",
label_column="ner_tags",
strategy="simple",
)
self.assertEqual(transformers_results["eval_accuracy"], evaluator_results["overall_accuracy"])
self.assertEqual(transformers_results["eval_f1"], evaluator_results["overall_f1"])
from unittest import TestCase
import matplotlib.pyplot as plt
from evaluate.visualization import radar_plot
class TestViz(TestCase):
def test_invert_range(self):
data = [{"accuracy": 0.9, "precision": 0.8}, {"accuracy": 0.7, "precision": 0.6}]
model_names = ["model1", "model2"]
wrong_invert_range = ["latency_in_seconds"] # Value not present in data
with self.assertRaises(ValueError):
radar_plot(data, model_names, wrong_invert_range)
def test_output_is_plot(self):
data = [
{"accuracy": 0.9, "precision": 0.8, "latency_in_seconds": 48.1},
{"accuracy": 0.7, "precision": 0.6, "latency_in_seconds": 51.4},
]
model_names = ["model1", "model2"]
invert_range = ["latency_in_seconds"]
out_plt = radar_plot(data, model_names, invert_range)
self.assertIsInstance(out_plt, plt.Figure)
import os
import tempfile
import unittest
from contextlib import contextmanager
from copy import deepcopy
from distutils.util import strtobool
from enum import Enum
from pathlib import Path
from unittest.mock import patch
from evaluate import config
def parse_flag_from_env(key, default=False):
try:
value = os.environ[key]
except KeyError:
# KEY isn't set, default to `default`.
_value = default
else:
# KEY is set, convert it to True or False.
try:
_value = strtobool(value)
except ValueError:
# More values are supported, but let's keep the message simple.
raise ValueError(f"If set, {key} must be yes or no.")
return _value
_run_slow_tests = parse_flag_from_env("RUN_SLOW", default=False)
_run_remote_tests = parse_flag_from_env("RUN_REMOTE", default=False)
_run_local_tests = parse_flag_from_env("RUN_LOCAL", default=True)
_run_packaged_tests = parse_flag_from_env("RUN_PACKAGED", default=True)
def require_beam(test_case):
"""
Decorator marking a test that requires Apache Beam.
These tests are skipped when Apache Beam isn't installed.
"""
if not config.TORCH_AVAILABLE:
test_case = unittest.skip("test requires PyTorch")(test_case)
return test_case
def require_faiss(test_case):
"""
Decorator marking a test that requires Faiss.
These tests are skipped when Faiss isn't installed.
"""
try:
import faiss # noqa
except ImportError:
test_case = unittest.skip("test requires faiss")(test_case)
return test_case
def require_regex(test_case):
"""
Decorator marking a test that requires regex.
These tests are skipped when Regex isn't installed.
"""
try:
import regex # noqa
except ImportError:
test_case = unittest.skip("test requires regex")(test_case)
return test_case
def require_elasticsearch(test_case):
"""
Decorator marking a test that requires ElasticSearch.
These tests are skipped when ElasticSearch isn't installed.
"""
try:
import elasticsearch # noqa
except ImportError:
test_case = unittest.skip("test requires elasticsearch")(test_case)
return test_case
def require_torch(test_case):
"""
Decorator marking a test that requires PyTorch.
These tests are skipped when PyTorch isn't installed.
"""
if not config.TORCH_AVAILABLE:
test_case = unittest.skip("test requires PyTorch")(test_case)
return test_case
def require_tf(test_case):
"""
Decorator marking a test that requires TensorFlow.
These tests are skipped when TensorFlow isn't installed.
"""
if not config.TF_AVAILABLE:
test_case = unittest.skip("test requires TensorFlow")(test_case)
return test_case
def require_jax(test_case):
"""
Decorator marking a test that requires JAX.
These tests are skipped when JAX isn't installed.
"""
if not config.JAX_AVAILABLE:
test_case = unittest.skip("test requires JAX")(test_case)
return test_case
def require_pil(test_case):
"""
Decorator marking a test that requires Pillow.
These tests are skipped when Pillow isn't installed.
"""
if not config.PIL_AVAILABLE:
test_case = unittest.skip("test requires Pillow")(test_case)
return test_case
def require_transformers(test_case):
"""
Decorator marking a test that requires transformers.
These tests are skipped when transformers isn't installed.
"""
try:
import transformers # noqa F401
except ImportError:
return unittest.skip("test requires transformers")(test_case)
else:
return test_case
def slow(test_case):
"""
Decorator marking a test as slow.
Slow tests are skipped by default. Set the RUN_SLOW environment variable
to a truthy value to run them.
"""
if not _run_slow_tests or _run_slow_tests == 0:
test_case = unittest.skip("test is slow")(test_case)
return test_case
def local(test_case):
"""
Decorator marking a test as local
Local tests are run by default. Set the RUN_LOCAL environment variable
to a falsy value to not run them.
"""
if not _run_local_tests or _run_local_tests == 0:
test_case = unittest.skip("test is local")(test_case)
return test_case
def packaged(test_case):
"""
Decorator marking a test as packaged
Packaged tests are run by default. Set the RUN_PACKAGED environment variable
to a falsy value to not run them.
"""
if not _run_packaged_tests or _run_packaged_tests == 0:
test_case = unittest.skip("test is packaged")(test_case)
return test_case
def remote(test_case):
"""
Decorator marking a test as one that relies on GitHub or the Hugging Face Hub.
Remote tests are skipped by default. Set the RUN_REMOTE environment variable
to a falsy value to not run them.
"""
if not _run_remote_tests or _run_remote_tests == 0:
test_case = unittest.skip("test requires remote")(test_case)
return test_case
def for_all_test_methods(*decorators):
def decorate(cls):
for name, fn in cls.__dict__.items():
if callable(fn) and name.startswith("test"):
for decorator in decorators:
fn = decorator(fn)
setattr(cls, name, fn)
return cls
return decorate
class RequestWouldHangIndefinitelyError(Exception):
pass
class OfflineSimulationMode(Enum):
CONNECTION_FAILS = 0
CONNECTION_TIMES_OUT = 1
HF_EVALUATE_OFFLINE_SET_TO_1 = 2
@contextmanager
def offline(mode=OfflineSimulationMode.CONNECTION_FAILS, timeout=1e-16):
"""
Simulate offline mode.
There are three offline simulatiom modes:
CONNECTION_FAILS (default mode): a ConnectionError is raised for each network call.
Connection errors are created by mocking socket.socket
CONNECTION_TIMES_OUT: the connection hangs until it times out.
The default timeout value is low (1e-16) to speed up the tests.
Timeout errors are created by mocking requests.request
HF_EVALUATE_OFFLINE_SET_TO_1: the HF_EVALUATE_OFFLINE environment variable is set to 1.
This makes the http/ftp calls of the library instantly fail and raise an OfflineModeEmabled error.
"""
from requests import request as online_request
def timeout_request(method, url, **kwargs):
# Change the url to an invalid url so that the connection hangs
invalid_url = "https://10.255.255.1"
if kwargs.get("timeout") is None:
raise RequestWouldHangIndefinitelyError(
f"Tried a call to {url} in offline mode with no timeout set. Please set a timeout."
)
kwargs["timeout"] = timeout
try:
return online_request(method, invalid_url, **kwargs)
except Exception as e:
# The following changes in the error are just here to make the offline timeout error prettier
e.request.url = url
max_retry_error = e.args[0]
max_retry_error.args = (max_retry_error.args[0].replace("10.255.255.1", f"OfflineMock[{url}]"),)
e.args = (max_retry_error,)
raise
def offline_socket(*args, **kwargs):
raise OSError("Offline mode is enabled.")
if mode is OfflineSimulationMode.CONNECTION_FAILS:
# inspired from https://stackoverflow.com/a/18601897
with patch("socket.socket", offline_socket):
yield
elif mode is OfflineSimulationMode.CONNECTION_TIMES_OUT:
# inspired from https://stackoverflow.com/a/904609
with patch("requests.request", timeout_request):
with patch("requests.api.request", timeout_request):
yield
elif mode is OfflineSimulationMode.HF_EVALUATE_OFFLINE_SET_TO_1:
with patch("evaluate.config.HF_EVALUATE_OFFLINE", True):
yield
else:
raise ValueError("Please use a value from the OfflineSimulationMode enum.")
@contextmanager
def set_current_working_directory_to_temp_dir(*args, **kwargs):
original_working_dir = str(Path().resolve())
with tempfile.TemporaryDirectory(*args, **kwargs) as tmp_dir:
try:
os.chdir(tmp_dir)
yield
finally:
os.chdir(original_working_dir)
def is_rng_equal(rng1, rng2):
return deepcopy(rng1).integers(0, 100, 10).tolist() == deepcopy(rng2).integers(0, 100, 10).tolist()
__version__ = "2.2.2"
from mamba_ssm.ops.selective_scan_interface import selective_scan_fn, mamba_inner_fn
from mamba_ssm.modules.mamba_simple import Mamba
from mamba_ssm.modules.mamba2 import Mamba2
from mamba_ssm.models.mixer_seq_simple import MambaLMHeadModel
from typing import Optional
import torch
from torch import Tensor
from torch.distributed import ProcessGroup
# `all_gather_into_tensor` and `reduce_scatter_tensor` are new placeholders for
# `_all_gather_base` and `_reduce_scatter_base`. They require the most recent
# version of PyTorch. The following 4 lines are for backward compatibility with
# older PyTorch.
if "all_gather_into_tensor" not in dir(torch.distributed):
torch.distributed.all_gather_into_tensor = torch.distributed._all_gather_base
if "reduce_scatter_tensor" not in dir(torch.distributed):
torch.distributed.reduce_scatter_tensor = torch.distributed._reduce_scatter_base
# Raw operation, does not support autograd, but does support async
def all_gather_raw(input_: Tensor, process_group: ProcessGroup, async_op: bool = False):
world_size = torch.distributed.get_world_size(process_group)
output = torch.empty(
world_size * input_.shape[0], *input_.shape[1:], dtype=input_.dtype, device=input_.device
)
handle = torch.distributed.all_gather_into_tensor(
output, input_.contiguous(), group=process_group, async_op=async_op
)
return output, handle
# Raw operation, does not support autograd, but does support async
def reduce_scatter_raw(input_: Tensor, process_group: ProcessGroup, async_op: bool = False):
world_size = torch.distributed.get_world_size(process_group)
assert input_.shape[0] % world_size == 0
output = torch.empty(
input_.shape[0] // world_size, *input_.shape[1:], dtype=input_.dtype, device=input_.device
)
handle = torch.distributed.reduce_scatter_tensor(
output, input_.contiguous(), group=process_group, async_op=async_op
)
return output, handle
# Raw operation, does not support autograd, but does support async
def all_reduce_raw(input_: Tensor, process_group: ProcessGroup, async_op: bool = False):
input_ = input_.contiguous()
handle = torch.distributed.all_reduce(input_, group=process_group, async_op=async_op)
return input_, handle
class AllGatherFunc(torch.autograd.Function):
"""Gather the input from sequence parallel region and concatenate."""
@staticmethod
def forward(ctx, input_: Tensor, process_group: ProcessGroup) -> Tensor:
ctx.process_group = process_group
output, _ = all_gather_raw(input_, process_group)
return output
@staticmethod
def backward(ctx, grad_output: Tensor):
grad_input, _ = reduce_scatter_raw(grad_output, ctx.process_group)
return grad_input, None
# Supports autograd, but does not support async
all_gather = AllGatherFunc.apply
class ReduceScatterFunc(torch.autograd.Function):
"""Reduce scatter the input from the sequence parallel region and concatenate."""
@staticmethod
def forward(ctx, input_: Tensor, process_group: ProcessGroup) -> Tensor:
ctx.process_group = process_group
output, _ = reduce_scatter_raw(input_, process_group)
return output
@staticmethod
def backward(ctx, grad_output: Tensor):
grad_input, _ = all_gather_raw(grad_output, ctx.process_group)
return grad_input, None
# Supports autograd, but does not support async
reduce_scatter = ReduceScatterFunc.apply
class AllReduceFunc(torch.autograd.Function):
"""Gather the input from sequence parallel region and concatenate."""
@staticmethod
def forward(ctx, input_: Tensor, process_group: ProcessGroup) -> Tensor:
ctx.process_group = process_group
output, _ = all_reduce_raw(input_, process_group)
return output
@staticmethod
def backward(ctx, grad_output: Tensor):
return grad_output, None
# Supports autograd, but does not support async
all_reduce = AllReduceFunc.apply
def sync_shared_params(model: torch.nn.Module, process_group: ProcessGroup):
# We want to iterate over parameters with _shared_params=True in the same order,
# as different ranks might have different number of parameters (e.g., only rank 0 has bias).
pamams_shared = {
name: p for name, p in model.named_parameters() if getattr(p, "_shared_params", False)
}
for _, p in sorted(pamams_shared.items()):
with torch.no_grad():
# Broadcast needs src to be global rank, not group rank
torch.distributed.broadcast(
p, src=torch.distributed.get_global_rank(process_group, 0), group=process_group
)
# Ref: https://github.com/NVIDIA/Megatron-LM/blob/52e636888cccc41e931251c417a7181fc36de926/megatron/optimizer/optimizer.py#L256
def allreduce_sequence_parallel_grad(model: torch.nn.Module, process_group: ProcessGroup):
# We want to iterate over parameters with _sequence_parallel=True in the same order,
# as different ranks might have different number of parameters (e.g., only rank 0 has bias).
params_seqparallel = {
name: p for name, p in model.named_parameters() if getattr(p, "_sequence_parallel", False)
}
grads = [p.grad for _, p in sorted(params_seqparallel.items())]
if grads:
with torch.no_grad():
coalesced = torch._utils._flatten_dense_tensors(grads)
torch.distributed.all_reduce(coalesced, group=process_group)
for buf, synced in zip(grads, torch._utils._unflatten_dense_tensors(coalesced, grads)):
buf.copy_(synced)
def get_dim_for_local_rank(dim: int, world_size: int, local_rank: int, multiple_of: int = 1) -> int:
"""Get the dim for the local rank derived from splitting dim on world_size processes.
The split may not be even across the world_size processes.
"""
multiple = dim // multiple_of
div = multiple // world_size
mod = multiple % world_size
local_multiple = div + int(local_rank < mod)
return local_multiple * multiple_of
# Copyright (c) 2024, Tri Dao.
# The TensorParallel linear modules are inspired by https://github.com/NVIDIA/apex/blob/master/apex/transformer/tensor_parallel/layers.py
from typing import Optional
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import Tensor
from torch.cuda.amp import custom_bwd, custom_fwd
from torch.distributed import ProcessGroup
from einops import rearrange
from mamba_ssm.distributed.distributed_utils import (
all_gather_raw,
all_reduce,
all_reduce_raw,
reduce_scatter,
reduce_scatter_raw,
)
class ParallelLinearFunc(torch.autograd.Function):
@staticmethod
@custom_fwd
def forward(ctx, x, weight, bias, process_group=None, sequence_parallel=True):
"""
If process_group is not None and sequence_parallel=True, we're doing Tensor Parallel
with sequence parallelism: we do an all_gather_raw of x before doing the matmul.
"""
ctx.compute_weight_gradient = weight.requires_grad
ctx.process_group = process_group
ctx.sequence_parallel = sequence_parallel
if torch.is_autocast_enabled():
x = x.to(dtype=torch.get_autocast_gpu_dtype())
x = x.contiguous()
if process_group is not None and sequence_parallel:
# We want to kick off the all_gather early, before weight dtype conversion
total_x, handle_x = all_gather_raw(x, process_group, async_op=True)
else:
total_x = x
if torch.is_autocast_enabled():
weight = weight.to(dtype=torch.get_autocast_gpu_dtype())
bias = bias.to(dtype=torch.get_autocast_gpu_dtype()) if bias is not None else None
weight = weight.contiguous()
if process_group is not None and sequence_parallel:
handle_x.wait()
batch_shape, n = total_x.shape[:-1], total_x.shape[-1]
batch_dim = batch_shape.numel()
# https://github.com/pytorch/pytorch/blob/5b51849b48a7dbccd297286cc0110def4706f9e7/aten/src/ATen/native/cuda/Blas.cpp#L174
output = F.linear(total_x, weight, bias)
if ctx.compute_weight_gradient:
ctx.save_for_backward(x, weight)
else:
ctx.save_for_backward(weight)
return output
@staticmethod
@custom_bwd
def backward(ctx, grad_output):
grad_output = grad_output.contiguous()
process_group = ctx.process_group
sequence_parallel = ctx.sequence_parallel
if ctx.compute_weight_gradient:
x, weight = ctx.saved_tensors
if process_group is not None and sequence_parallel:
total_x, handle_x = all_gather_raw(x, process_group, async_op=True)
else:
total_x = x
else:
(weight,) = ctx.saved_tensors
total_x = None
batch_shape = grad_output.shape[:-1]
batch_dim = batch_shape.numel()
grad_output = grad_output.reshape(batch_dim, grad_output.shape[-1])
if ctx.needs_input_grad[0]:
grad_input = F.linear(grad_output, weight.t())
grad_input = grad_input.reshape(*batch_shape, grad_input.shape[-1])
if process_group is not None:
reduce_fn = reduce_scatter_raw if sequence_parallel else all_reduce_raw
grad_input, handle_grad_input = reduce_fn(grad_input, process_group, async_op=True)
else:
grad_input = None
if ctx.needs_input_grad[1]:
assert ctx.compute_weight_gradient
if process_group is not None and sequence_parallel:
handle_x.wait()
grad_weight = torch.einsum(
"bo,bi->oi", grad_output, total_x.reshape(batch_dim, total_x.shape[-1])
)
else:
grad_weight = None
grad_bias = grad_output.sum(dim=0) if ctx.needs_input_grad[2] else None
if process_group is not None and ctx.needs_input_grad[0]:
handle_grad_input.wait()
return grad_input, grad_weight, grad_bias, None, None
def parallel_linear_func(
x: Tensor,
weight: Tensor,
bias: Optional[Tensor] = None,
process_group: Optional[ProcessGroup] = None,
sequence_parallel: bool = True,
):
return ParallelLinearFunc.apply(x, weight, bias, process_group, sequence_parallel)
class ColumnParallelLinear(nn.Linear):
def __init__(
self,
in_features: int,
out_features: int,
process_group: ProcessGroup,
bias: bool = True,
sequence_parallel=True,
multiple_of=1,
device=None,
dtype=None,
) -> None:
world_size = torch.distributed.get_world_size(process_group)
if out_features % multiple_of:
raise ValueError(f"out_features ({out_features}) must be a multiple of {multiple_of}")
multiple = out_features // multiple_of
# We want to split @multiple across world_size, but it could be an uneven split
div = multiple // world_size
mod = multiple % world_size
# The first @mod ranks get @div + 1 copies, the rest get @div copies
local_multiple = div + int(torch.distributed.get_rank(process_group) < mod)
super().__init__(
in_features, local_multiple * multiple_of, bias=bias, device=device, dtype=dtype
)
self.process_group = process_group
self.sequence_parallel = sequence_parallel
def forward(self, x):
# If self.sequence_parallel is True, we're doing Tensor Parallel with sequence parallelism:
# we do an all_gather of x before doing the matmul.
# If not, then the input is already gathered.
return parallel_linear_func(
x,
self.weight,
self.bias,
process_group=self.process_group,
sequence_parallel=self.sequence_parallel,
)
class RowParallelLinear(nn.Linear):
def __init__(
self,
in_features: int,
out_features: int,
process_group: ProcessGroup,
bias: bool = True,
sequence_parallel=True,
multiple_of=1,
device=None,
dtype=None,
) -> None:
world_size = torch.distributed.get_world_size(process_group)
rank = torch.distributed.get_rank(process_group)
if in_features % multiple_of:
raise ValueError(f"in_features ({in_features}) must be a multiple of {multiple_of}")
multiple = in_features // multiple_of
# We want to split @multiple across world_size, but it could be an uneven split
div = multiple // world_size
mod = multiple % world_size
# The first @mod ranks get @div + 1 copies, the rest get @div copies
local_multiple = div + int(torch.distributed.get_rank(process_group) < mod)
# Only rank 0 will have bias
super().__init__(
local_multiple * multiple_of,
out_features,
bias=bias and rank == 0,
device=device,
dtype=dtype,
)
self.process_group = process_group
self.sequence_parallel = sequence_parallel
def forward(self, x):
"""
We're doing Tensor Parallel with sequence parallelism: we do the matmul and then
a reduce_scatter of the result.
"""
out = parallel_linear_func(x, self.weight, self.bias)
reduce_fn = reduce_scatter if self.sequence_parallel else all_reduce
return reduce_fn(out, self.process_group)
class VocabParallelEmbedding(nn.Embedding):
def __init__(self, num_embeddings, *args, process_group=None, padding_idx=None, **kwargs):
self.process_group = process_group
if process_group is not None:
world_size = torch.distributed.get_world_size(process_group)
if num_embeddings % world_size != 0:
raise ValueError(
f"num_embeddings ({num_embeddings}) must be divisible by "
f"world_size ({world_size})"
)
if world_size > 1 and padding_idx is not None:
raise RuntimeError("ParallelEmbedding does not support padding_idx")
else:
world_size = 1
super().__init__(num_embeddings // world_size, *args, padding_idx=padding_idx, **kwargs)
def forward(self, input: Tensor) -> Tensor:
if self.process_group is None:
return super().forward(input)
else:
rank = torch.distributed.get_rank(self.process_group)
vocab_size = self.num_embeddings
vocab_start_index, vocab_end_index = rank * vocab_size, (rank + 1) * vocab_size
# Create a mask of valid vocab ids (1 means it needs to be masked).
input_ids_mask = (input < vocab_start_index) | (input >= vocab_end_index)
input = input - vocab_start_index
input[input_ids_mask] = 0
embeddings = super().forward(input)
embeddings[input_ids_mask] = 0.0
return embeddings
class ColumnParallelEmbedding(nn.Embedding):
def __init__(self, num_embeddings, embedding_dim, *args, process_group=None, **kwargs):
self.process_group = process_group
if process_group is not None:
world_size = torch.distributed.get_world_size(process_group)
if embedding_dim % world_size != 0:
raise ValueError(
f"embedding_dim ({embedding_dim}) must be divisible by "
f"world_size ({world_size})"
)
else:
world_size = 1
super().__init__(num_embeddings, embedding_dim // world_size, *args, **kwargs)
class ParallelEmbeddings(nn.Module):
def __init__(
self,
embed_dim,
vocab_size,
max_position_embeddings,
process_group,
padding_idx=None,
sequence_parallel=True,
device=None,
dtype=None,
):
"""
If max_position_embeddings <= 0, there's no position embeddings
"""
factory_kwargs = {"device": device, "dtype": dtype}
super().__init__()
self.process_group = process_group
self.sequence_parallel = sequence_parallel
self.word_embeddings = VocabParallelEmbedding(
vocab_size,
embed_dim,
padding_idx=padding_idx,
process_group=process_group,
**factory_kwargs,
)
self.max_position_embeddings = max_position_embeddings
if self.max_position_embeddings > 0:
self.position_embeddings = ColumnParallelEmbedding(
max_position_embeddings, embed_dim, process_group=process_group, **factory_kwargs
)
def forward(self, input_ids, position_ids=None, combine_batch_seqlen_dim=False):
"""
input_ids: (batch, seqlen)
position_ids: (batch, seqlen)
"""
batch_size, seqlen = input_ids.shape
world_size = torch.distributed.get_world_size(self.process_group)
embeddings = self.word_embeddings(input_ids)
if self.max_position_embeddings > 0:
if position_ids is None:
position_ids = torch.arange(seqlen, dtype=torch.long, device=input_ids.device)
position_embeddings = self.position_embeddings(position_ids)
if world_size <= 1:
embeddings = embeddings + position_embeddings
else:
partition_dim = self.position_embeddings.embedding_dim
rank = torch.distributed.get_rank(self.process_group)
embeddings[
..., rank * partition_dim : (rank + 1) * partition_dim
] += position_embeddings
if combine_batch_seqlen_dim:
embeddings = rearrange(embeddings, "b s d -> (b s) d")
reduce_fn = reduce_scatter if self.sequence_parallel else all_reduce
return embeddings if world_size <= 1 else reduce_fn(embeddings, self.process_group)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment