common_utils.py 2.36 KB
Newer Older
Baber's avatar
cleanup  
Baber committed
1
2
3
4
5
6
7
8
9
10
11
import re
from functools import cache
from typing import TYPE_CHECKING, Union

from transformers import AutoTokenizer


if TYPE_CHECKING:
    import transformers


Baber's avatar
Baber committed
12
DEFAULT_SEQ_LENGTHS = (
Baber's avatar
cleanup  
Baber committed
13
14
15
    # 131072,
    # 65536,
    # 32768,
Baber's avatar
fixup  
Baber committed
16
17
    # 16384,
    # 8192,
Baber's avatar
cleanup  
Baber committed
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
    4096,
)


@cache
def get_tokenizer(
    tokenizer=None, pretrained=None, **kwargs
) -> Union["transformers.PreTrainedTokenizer", "transformers.PreTrainedTokenizerFast"]:
    pretrained = tokenizer or pretrained
    assert pretrained, "No tokenizer or pretrained provided."
    print("using tokenizer ", pretrained)
    return AutoTokenizer.from_pretrained(pretrained, trust_remote_code=True)


def postprocess_pred(predict_str: str) -> str:
    predict_str = predict_str.strip()

    # Remove all non-printable characters
    np_pattern = re.compile(r"[\x00-\x1f]")
    predict_str = np_pattern.sub("\n", predict_str).strip()

    return predict_str


def string_match_all(preds: list[str], refs: list[list[str]]) -> float:
    score = sum(
        [
            sum([1.0 if r.lower() in pred.lower() else 0.0 for r in ref]) / len(ref)
            for pred, ref in zip(preds, refs)
        ]
    ) / len(preds)
    return score


def string_match_part(preds: list[str], refs: list[list[str]]) -> float:
    score = max(
        [
            sum([1.0 if r.lower() in pred.lower() else 0.0 for r in ref]) / len(ref)
            for pred, ref in zip(preds, refs)
        ]
    ) / len(preds)
    return score


def process_results(doc: dict, results: list[str]) -> dict[str, float]:
    # hacky: set all other lengths to -1
Baber's avatar
Baber committed
64
    metrics = {str(length): -1.0 for length in DEFAULT_SEQ_LENGTHS}
Baber's avatar
cleanup  
Baber committed
65
66
67
68
69
70
71
72
73
    input_len = doc["max_length"]
    pred = postprocess_pred(results[0])
    score = string_match_all([pred], [doc["outputs"]])
    metrics[str(input_len)] = score
    return metrics


def process_results_part(doc: dict, results: list[str]) -> dict[str, float]:
    # hacky: set all other lengths to -1
Baber's avatar
Baber committed
74
    metrics = {str(length): -1.0 for length in DEFAULT_SEQ_LENGTHS}
Baber's avatar
cleanup  
Baber committed
75
76
77
78
79
80
81
82
83
84
85
86
87
    input_len = doc["max_length"]
    pred = postprocess_pred(results[0])
    score = string_match_part([pred], [doc["outputs"]])
    metrics[str(input_len)] = score
    return metrics


def aggregate_metrics(metrics: list[float]) -> float:
    res = [x for x in metrics if x != -1]
    if not res:
        # we don't have any samples with this length
        return 0.0
    return sum(res) / len(res)