mctask_experimental.py 9.09 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
""" Multiple Choice Format Experiments.
TODO: Generalize the formatting of fewshot examples.
"""
import os
import abc
import hashlib
from argparse import ArgumentError
from dataclasses import dataclass
import typing
from attr import field
import numpy as np
import lm_eval.base as base
from lm_eval.metrics import mean

15

16
17
@dataclass
class MultipleChoiceDoc:
18
    """ Structure for storing documents. """
19
    question: str
20
    keys: typing.List[str]  # Should these be the same type as gold?
21
22
23
    options: typing.List[str]
    gold: int
    id: int = field(init=False)
24
    context: str = None  # Any extra context prior to the question.
25
26
27
28

    def __post_init__(self):
        self.id = hashlib.sha224(self.question.encode('utf-8')).hexdigest()

29

30
class BaseMultipleChoiceTask(base.Task, abc.ABC):
31
    """ Base Multiple Choice Task """
32
33

    def doc_to_text(self, doc: MultipleChoiceDoc):
34
35
        ctx = f"{doc.context}\n" if doc.context else ""
        return ctx + self.format_prompt(doc)
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

    @abc.abstractclassmethod
    def format_prompt(cls, doc: MultipleChoiceDoc) -> str:
        pass

    @abc.abstractmethod
    def doc_to_target(self, doc: MultipleChoiceDoc) -> str:
        pass

    @abc.abstractmethod
    def loglikelihood_continuation(self, doc: MultipleChoiceDoc) -> typing.List[str]:
        pass

    def construct_requests(self, doc: MultipleChoiceDoc, ctx: str):
        lls = []
        conts = self.loglikelihood_continuation(doc)
52
        #print(f"\n\n{conts}\n\n")
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
        for cont in conts:
            lls.append(base.rf.loglikelihood(ctx, f" {cont}")[0])
        return lls

    def process_results(self, doc: MultipleChoiceDoc, results: typing.List):
        gold = doc.gold
        ans = np.argmax(results)
        is_correct = 1. if ans == gold else 0.
        # Normalize by completion length.
        conts = self.loglikelihood_continuation(doc)
        completion_len = np.array([float(len(i)) for i in conts])
        acc_norm = 1. if np.argmax(results / completion_len) == gold else 0.
        return {
            "acc": is_correct,
            "acc_norm": acc_norm,
68
69
            # Bundle answers: (model_answer, model_answer_index, is_correct, question_id).
            "answer_bundle": (doc.keys[ans], ans, is_correct, doc.id),
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
        }

    def higher_is_better(self):
        return {
            "acc": True,
            "acc_norm": True,
            "answer_bundle": True,
        }

    def aggregation(self):
        return {
            "acc": mean,
            "acc_norm": mean,
            "answer_bundle": answer_bundle
        }

jon-tow's avatar
jon-tow committed
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
    # UNCOMMENT TO WRITE OUT THE QUESTION TABLE
    # TODO: Write a function for this.
    #
    # def process_results(self, doc: MultipleChoiceDoc, results: typing.List):
    #     gold = doc.gold
    #     ans = np.argmax(results)
    #     is_correct = 1. if ans == gold else 0.
    #     # Normalize by completion length.
    #     conts = self.loglikelihood_continuation(doc)
    #     completion_len = np.array([float(len(i)) for i in conts])
    #     acc_norm = 1. if np.argmax(results / completion_len) == gold else 0.
    #     return {
    #         "acc": is_correct,
    #         "acc_norm": acc_norm,
    #         # Bundle questions: (question_id, question, option_0, option_1, option_2, option_3)
    #         "question_bundle": (doc.id, doc.question, doc.options),
    #     }

    # def higher_is_better(self):
    #     return {
    #         "acc": True,
    #         "acc_norm": True,
    #         "question_bundle": True,
    #     }

    # def aggregation(self):
    #     return {
    #         "acc": mean,
    #         "acc_norm": mean,
    #         "question_bundle": question_bundle,
    #     }

118

119
120
121
122
def answer_bundle(items):
    """ Bundles answers into a csv file. """
    from pathlib import Path
    import csv
123
124
125
    cols = ["model_answer", "model_answer_index", "is_correct", "question_id"]
    rows = [*items]
    path = os.environ["QUESTION_RESULT_PATH"]
126
    with open(f'{path}/question-by-question-results.csv', 'a', encoding="utf-8") as f:
127
128
129
130
131
132
133
134
135
136
        write = csv.writer(f)
        write.writerow(cols)
        write.writerows(rows)
    return 0


def question_bundle(items):
    """ Bundles questions into a csv file. """
    from pathlib import Path
    import csv
137
138
139
140
    options = items[0][2]
    options_name = [f"option_{i}" for i in range(len(options))]
    cols = ["question_id","question", *options_name]

141
    path = os.environ["QUESTION_RESULT_PATH"]
142
143
144
145
146
147
148
    f = open(f'{path}/question-table.csv', 'a', encoding="utf-8")
    writer = csv.writer(f)
    writer.writerow(cols)
    for item in items:
        writer.writerow([item[0],item[1],*item[2]])

    f.close()
149
150
151
152
    return 0


def key2num(doc: MultipleChoiceDoc, key: str) -> int:
153
    """ Maps document keys to numeric 1-based indices. """
154
155
    return str(doc.keys.index(key) + 1)  # `+ 1` for 1-based indexing.

156

157
def key2letter(doc: MultipleChoiceDoc, key: str) -> str:
158
    """ Maps keys to capital alphabet letters. """
159
160
161
162
    A_ascii = 65
    ascii_offset = doc.keys.index(key)
    letter = chr(A_ascii + ascii_offset)
    return letter
163

164

165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def format_key(key: str, type: str):
    """ Formats a multiple choice key. E.g.
        format_key("A", "period") => "A."
        format_key("A", "parens") => "(A)"
        format_key("A",  "colon") => "A:"
    Args:
    - type: "period" | "parens" | "colon"
    """
    if type == "parens":
        return f"({key})"
    elif type == "period":
        return f"{key}."
    elif type == "colon":
        return f"{key}:"
    else:
        raise ArgumentError()


class MC_NoOptionList_OptionLL_Task(BaseMultipleChoiceTask):
184
    """ "freeform"
185
    Format:
186
        <Context>
187
188
189
190
191
        Question: <question>
        Answer: 
    Continuation:
        loglikelihood_continuation = <option_i>
    """
192

193
194
195
196
197
198
199
200
201
202
203
204
205
    def format_prompt(cls, doc: MultipleChoiceDoc) -> str:
        prompt = "Question: " + doc.question + "\n"
        prompt += "Answer:"
        return prompt

    def doc_to_target(self, doc: MultipleChoiceDoc) -> str:
        return " " + doc.options[doc.gold]

    def loglikelihood_continuation(self, doc: MultipleChoiceDoc) -> typing.List[str]:
        return [option for option in doc.options]


class MC_WithOptionList_OptionLL_Task(BaseMultipleChoiceTask):
206
    """ "option"
207
    Format:
208
        <Context>
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
        Question: <question>
        <key1>: <option1>
        <key2>: <option2>
        ...
        Answer: 
    Continuation:
        loglikelihood_continuation = <option_i>
    """
    def format_prompt(cls, doc: MultipleChoiceDoc) -> str:
        prompt = "Question: " + doc.question + "\n"
        prompt += "\n".join([
            f"{format_key(doc.keys[i], 'colon')} {option}"
            for i, option in enumerate(doc.options)
        ])
        prompt += "\nAnswer:"
        return prompt

    def doc_to_target(self, doc: MultipleChoiceDoc) -> str:
        return " " + doc.options[doc.gold]

    def loglikelihood_continuation(self, doc: MultipleChoiceDoc) -> typing.List[str]:
        return [option for option in doc.options]


class MC_WithOptionList_LetterLL_Task(BaseMultipleChoiceTask):
234
    """ "letter"
235
    Format:
236
        <Context>
237
        Question: <question>
238
239
        A: <option1>
        B: <option2>
240
241
242
243
244
245
246
247
        ...
        Answer: 
    Continuation:
        loglikelihood_continuation = <key_i>
    """
    def format_prompt(cls, doc: MultipleChoiceDoc) -> str:
        prompt = "Question: " + doc.question + "\n"
        prompt += "\n".join([
248
            f"{format_key(key2letter(doc, doc.keys[i]), 'colon')} {option}"
249
250
251
252
            for i, option in enumerate(doc.options)
        ])
        prompt += "\nAnswer:"
        return prompt
jon-tow's avatar
jon-tow committed
253

254
255
256
257
258
259
260
261
    def doc_to_target(self, doc: MultipleChoiceDoc) -> str:
        return " " + doc.keys[doc.gold]

    def loglikelihood_continuation(self, doc: MultipleChoiceDoc) -> typing.List[str]:
        return [key for key in doc.keys]


class MC_WithOptionList_NumLL_Task(BaseMultipleChoiceTask):
262
    """ "number"
263
    Format:
264
        <Context>
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
        Question: <question>
        1: <option1>
        2: <option2>
        ...
        Answer: 
    Continuation:
        loglikelihood_continuation = <key2num(key_i)>
    """
    def format_prompt(cls, doc: MultipleChoiceDoc) -> str:
        prompt = "Question: " + doc.question + "\n"
        prompt += "\n".join([
            f"{format_key(key2num(doc, doc.keys[i]), 'colon')} {option}"
            for i, option in enumerate(doc.options)
        ])
        prompt += "\nAnswer:"
        return prompt

    def doc_to_target(self, doc: MultipleChoiceDoc) -> str:
        return f" {doc.gold + 1}"  # `+ 1` for 1-based indexing.

    def loglikelihood_continuation(self, doc: MultipleChoiceDoc) -> typing.List[str]:
        return [key2num(doc, key) for key in doc.keys]


# TODO: Try to come up with a way to do this it at runtime.
if os.environ["MC_SETTING"] == "freeform":
    MULTIPLE_CHOICE_TASK = MC_NoOptionList_OptionLL_Task
elif os.environ["MC_SETTING"] == "option":
    MULTIPLE_CHOICE_TASK = MC_WithOptionList_OptionLL_Task
elif os.environ["MC_SETTING"] == "letter":
    MULTIPLE_CHOICE_TASK = MC_WithOptionList_LetterLL_Task
elif os.environ["MC_SETTING"] == "number":
    MULTIPLE_CHOICE_TASK = MC_WithOptionList_NumLL_Task
else:
299
    print("No such MC_SETTING:", os.environ["MC_SETTING"])