common_utils.py 2.29 KB
Newer Older
Baber's avatar
cleanup  
Baber committed
1
2
3
4
5
6
7
8
9
10
11
import re
from functools import cache
from typing import TYPE_CHECKING, Union

from transformers import AutoTokenizer


if TYPE_CHECKING:
    import transformers


Baber's avatar
nit  
Baber committed
12
DEFAULT_SEQ_LENGTHS = (4096,)
Baber's avatar
cleanup  
Baber committed
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56


@cache
def get_tokenizer(
    tokenizer=None, pretrained=None, **kwargs
) -> Union["transformers.PreTrainedTokenizer", "transformers.PreTrainedTokenizerFast"]:
    pretrained = tokenizer or pretrained
    assert pretrained, "No tokenizer or pretrained provided."
    print("using tokenizer ", pretrained)
    return AutoTokenizer.from_pretrained(pretrained, trust_remote_code=True)


def postprocess_pred(predict_str: str) -> str:
    predict_str = predict_str.strip()

    # Remove all non-printable characters
    np_pattern = re.compile(r"[\x00-\x1f]")
    predict_str = np_pattern.sub("\n", predict_str).strip()

    return predict_str


def string_match_all(preds: list[str], refs: list[list[str]]) -> float:
    score = sum(
        [
            sum([1.0 if r.lower() in pred.lower() else 0.0 for r in ref]) / len(ref)
            for pred, ref in zip(preds, refs)
        ]
    ) / len(preds)
    return score


def string_match_part(preds: list[str], refs: list[list[str]]) -> float:
    score = max(
        [
            sum([1.0 if r.lower() in pred.lower() else 0.0 for r in ref]) / len(ref)
            for pred, ref in zip(preds, refs)
        ]
    ) / len(preds)
    return score


def process_results(doc: dict, results: list[str]) -> dict[str, float]:
    # hacky: set all other lengths to -1
Baber's avatar
Baber committed
57
    metrics = {str(length): -1.0 for length in DEFAULT_SEQ_LENGTHS}
Baber's avatar
cleanup  
Baber committed
58
59
60
61
62
63
64
65
66
    input_len = doc["max_length"]
    pred = postprocess_pred(results[0])
    score = string_match_all([pred], [doc["outputs"]])
    metrics[str(input_len)] = score
    return metrics


def process_results_part(doc: dict, results: list[str]) -> dict[str, float]:
    # hacky: set all other lengths to -1
Baber's avatar
Baber committed
67
    metrics = {str(length): -1.0 for length in DEFAULT_SEQ_LENGTHS}
Baber's avatar
cleanup  
Baber committed
68
69
70
71
72
73
74
75
76
77
78
79
80
    input_len = doc["max_length"]
    pred = postprocess_pred(results[0])
    score = string_match_part([pred], [doc["outputs"]])
    metrics[str(input_len)] = score
    return metrics


def aggregate_metrics(metrics: list[float]) -> float:
    res = [x for x in metrics if x != -1]
    if not res:
        # we don't have any samples with this length
        return 0.0
    return sum(res) / len(res)