test_train_stsb.py 5.88 KB
Newer Older
Rayyyyy's avatar
Rayyyyy committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
Tests that the pretrained models produce the correct scores on the STSbenchmark dataset
"""

import csv
import gzip
import os
from typing import Generator, List, Tuple

import pytest
from torch.utils.data import DataLoader

from sentence_transformers import (
    SentencesDataset,
    SentenceTransformer,
    losses,
    util,
)
from sentence_transformers.evaluation import EmbeddingSimilarityEvaluator
from sentence_transformers.readers import InputExample


@pytest.fixture()
def sts_resource() -> Generator[Tuple[List[InputExample], List[InputExample]], None, None]:
    sts_dataset_path = "datasets/stsbenchmark.tsv.gz"
    if not os.path.exists(sts_dataset_path):
        util.http_get("https://sbert.net/datasets/stsbenchmark.tsv.gz", sts_dataset_path)

    stsb_train_samples = []
    stsb_test_samples = []
    with gzip.open(sts_dataset_path, "rt", encoding="utf8") as f:
        reader = csv.DictReader(f, delimiter="\t", quoting=csv.QUOTE_NONE)
        for row in reader:
            score = float(row["score"]) / 5.0  # Normalize score to range 0 ... 1
            inp_example = InputExample(texts=[row["sentence1"], row["sentence2"]], label=score)

            if row["split"] == "test":
                stsb_test_samples.append(inp_example)
            elif row["split"] == "train":
                stsb_train_samples.append(inp_example)
    yield stsb_train_samples, stsb_test_samples


@pytest.fixture()
def nli_resource() -> Generator[List[InputExample], None, None]:
    nli_dataset_path = "datasets/AllNLI.tsv.gz"
    if not os.path.exists(nli_dataset_path):
        util.http_get("https://sbert.net/datasets/AllNLI.tsv.gz", nli_dataset_path)

    label2int = {"contradiction": 0, "entailment": 1, "neutral": 2}
    nli_train_samples = []
    max_train_samples = 10000
    with gzip.open(nli_dataset_path, "rt", encoding="utf8") as f:
        reader = csv.DictReader(f, delimiter="\t", quoting=csv.QUOTE_NONE)
        for row in reader:
            if row["split"] == "train":
                label_id = label2int[row["label"]]
                nli_train_samples.append(InputExample(texts=[row["sentence1"], row["sentence2"]], label=label_id))
                if len(nli_train_samples) >= max_train_samples:
                    break
    yield nli_train_samples


def evaluate_stsb_test(model, expected_score, test_samples) -> None:
    evaluator = EmbeddingSimilarityEvaluator.from_input_examples(test_samples, name="sts-test")
    score = model.evaluate(evaluator) * 100
    print("STS-Test Performance: {:.2f} vs. exp: {:.2f}".format(score, expected_score))
    assert score > expected_score or abs(score - expected_score) < 0.1


@pytest.mark.slow
def test_train_stsb_slow(
    distilbert_base_uncased_model: SentenceTransformer, sts_resource: Tuple[List[InputExample], List[InputExample]]
) -> None:
    model = distilbert_base_uncased_model
    sts_train_samples, sts_test_samples = sts_resource
    train_dataset = SentencesDataset(sts_train_samples, model)
    train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=16)
    train_loss = losses.CosineSimilarityLoss(model=model)
    model.fit(
        train_objectives=[(train_dataloader, train_loss)],
        evaluator=None,
        epochs=1,
        evaluation_steps=1000,
        warmup_steps=int(len(train_dataloader) * 0.1),
        use_amp=True,
    )

    evaluate_stsb_test(model, 80.0, sts_test_samples)


@pytest.mark.skipif("CI" in os.environ, reason="This test is too slow for the CI (~8 minutes)")
def test_train_stsb(
    distilbert_base_uncased_model: SentenceTransformer, sts_resource: Tuple[List[InputExample], List[InputExample]]
) -> None:
    model = distilbert_base_uncased_model
    sts_train_samples, sts_test_samples = sts_resource
    train_dataset = SentencesDataset(sts_train_samples[:100], model)
    train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=16)
    train_loss = losses.CosineSimilarityLoss(model=model)
    model.fit(
        train_objectives=[(train_dataloader, train_loss)],
        evaluator=None,
        epochs=1,
        evaluation_steps=1000,
        warmup_steps=int(len(train_dataloader) * 0.1),
        use_amp=True,
    )

    evaluate_stsb_test(model, 60.0, sts_test_samples)


@pytest.mark.slow
def test_train_nli_slow(
    distilbert_base_uncased_model: SentenceTransformer,
    nli_resource: List[InputExample],
    sts_resource: Tuple[List[InputExample], List[InputExample]],
):
    model = distilbert_base_uncased_model
    _, sts_test_samples = sts_resource
    train_dataset = SentencesDataset(nli_resource, model=model)
    train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=16)
    train_loss = losses.SoftmaxLoss(
        model=model,
        sentence_embedding_dimension=model.get_sentence_embedding_dimension(),
        num_labels=3,
    )
    model.fit(
        train_objectives=[(train_dataloader, train_loss)],
        evaluator=None,
        epochs=1,
        warmup_steps=int(len(train_dataloader) * 0.1),
        use_amp=True,
    )

    evaluate_stsb_test(model, 50.0, sts_test_samples)


@pytest.mark.skipif("CI" in os.environ, reason="This test is too slow for the CI (~25 minutes)")
def test_train_nli(
    distilbert_base_uncased_model: SentenceTransformer,
    nli_resource: List[InputExample],
    sts_resource: Tuple[List[InputExample], List[InputExample]],
):
    model = distilbert_base_uncased_model
    _, sts_test_samples = sts_resource
    train_dataset = SentencesDataset(nli_resource[:100], model=model)
    train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=16)
    train_loss = losses.SoftmaxLoss(
        model=model,
        sentence_embedding_dimension=model.get_sentence_embedding_dimension(),
        num_labels=3,
    )
    model.fit(
        train_objectives=[(train_dataloader, train_loss)],
        evaluator=None,
        epochs=1,
        warmup_steps=int(len(train_dataloader) * 0.1),
        use_amp=True,
    )

    evaluate_stsb_test(model, 50.0, sts_test_samples)