function.py 4.7 KB
Newer Older
chenych's avatar
chenych committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

chenych's avatar
update  
chenych committed
15
16
17
import importlib.util
import os
import sys
chenych's avatar
chenych committed
18
from abc import ABC, abstractmethod
chenych's avatar
chenych committed
19
from collections import defaultdict
chenych's avatar
update  
chenych committed
20
21
from functools import partial
from typing import Callable, Dict, List, Optional, Tuple, TypedDict
chenych's avatar
chenych committed
22

chenych's avatar
chenych committed
23
24
25
import torch
from transformers import PreTrainedTokenizer

chenych's avatar
chenych committed
26
from ...protocol import DataProto
chenych's avatar
Update  
chenych committed
27
from .config import RewardConfig
chenych's avatar
chenych committed
28
29
30
31


class RewardScore(TypedDict):
    overall: float
chenych's avatar
update  
chenych committed
32
33
34
35
    format: Optional[float]
    accuracy: Optional[float]


chenych's avatar
chenych committed
36
SequentialRewardFunction = Callable[[str, str], RewardScore]
chenych's avatar
update  
chenych committed
37

chenych's avatar
chenych committed
38
BatchRewardFunction = Callable[[List[str], List[str]], List[RewardScore]]
chenych's avatar
update  
chenych committed
39

chenych's avatar
chenych committed
40
41

class FunctionRewardManager(ABC):
chenych's avatar
chenych committed
42
    """Reward manager for rule-based reward."""
chenych's avatar
update  
chenych committed
43

chenych's avatar
chenych committed
44
45
46
    def __init__(self, config: RewardConfig, tokenizer: PreTrainedTokenizer):
        if config.reward_function is None:
            raise ValueError("Reward function is not provided.")
chenych's avatar
update  
chenych committed
47

chenych's avatar
chenych committed
48
49
        if not os.path.exists(config.reward_function):
            raise FileNotFoundError(f"Reward function file {config.reward_function} not found.")
chenych's avatar
update  
chenych committed
50

chenych's avatar
chenych committed
51
        spec = importlib.util.spec_from_file_location("custom_reward_fn", config.reward_function)
chenych's avatar
update  
chenych committed
52
53
        module = importlib.util.module_from_spec(spec)
        try:
chenych's avatar
chenych committed
54
            sys.modules["custom_reward_fn"] = module
chenych's avatar
update  
chenych committed
55
56
            spec.loader.exec_module(module)
        except Exception as e:
chenych's avatar
chenych committed
57
            raise RuntimeError(f"Failed to load reward function: {e}")
chenych's avatar
chenych committed
58

chenych's avatar
chenych committed
59
60
        if not hasattr(module, config.reward_function_name):
            raise AttributeError(f"Module {module} does not have function {config.reward_function_name}.")
chenych's avatar
chenych committed
61

chenych's avatar
chenych committed
62
        reward_fn = getattr(module, config.reward_function_name)
chenych's avatar
chenych committed
63
64
65
66
        print(f"Using reward function `{config.reward_function_name}` from `{config.reward_function}`.")
        self.reward_fn = partial(reward_fn, **config.reward_function_kwargs)
        self.config = config
        self.tokenizer = tokenizer
chenych's avatar
chenych committed
67

chenych's avatar
chenych committed
68
69
70
71
72
73
74
75
76
    @abstractmethod
    def compute_reward(self, data: DataProto) -> Tuple[torch.Tensor, Dict[str, List[float]]]:
        """Compute reward for a batch of data."""
        ...


class SequentialFunctionRewardManager(FunctionRewardManager):
    reward_fn: SequentialRewardFunction

chenych's avatar
chenych committed
77
    def compute_reward(self, data: DataProto) -> Tuple[torch.Tensor, Dict[str, List[float]]]:
chenych's avatar
chenych committed
78
        reward_tensor = torch.zeros_like(data.batch["responses"], dtype=torch.float32)
chenych's avatar
chenych committed
79
        reward_metrics = defaultdict(list)
chenych's avatar
chenych committed
80
81
        response_ids = data.batch["responses"]
        response_length = data.batch["response_mask"].sum(dim=-1)
chenych's avatar
chenych committed
82
        for i in range(len(data)):
chenych's avatar
chenych committed
83
            valid_response_ids = response_ids[i][: response_length[i]]
chenych's avatar
Update  
chenych committed
84
85
86
            response_str = self.tokenizer.decode(
                valid_response_ids, skip_special_tokens=self.config.skip_special_tokens
            )
chenych's avatar
chenych committed
87
            ground_truth = data.non_tensor_batch["ground_truth"][i]
chenych's avatar
chenych committed
88

chenych's avatar
chenych committed
89
            score = self.reward_fn(response_str, ground_truth)
chenych's avatar
chenych committed
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
            reward_tensor[i, response_length[i] - 1] = score["overall"]
            for key, value in score.items():
                reward_metrics[key].append(value)

        return reward_tensor, reward_metrics


class BatchFunctionRewardManager(FunctionRewardManager):
    reward_fn: BatchRewardFunction

    def compute_reward(self, data: DataProto) -> Tuple[torch.Tensor, Dict[str, List[float]]]:
        response_str, ground_truth = [], []
        response_ids = data.batch["responses"]
        response_length = data.batch["response_mask"].sum(dim=-1)
        for i in range(len(data)):
            valid_response_ids = response_ids[i][: response_length[i]]
            response_str.append(
                self.tokenizer.decode(valid_response_ids, skip_special_tokens=self.config.skip_special_tokens)
            )
            ground_truth.append(data.non_tensor_batch["ground_truth"][i])

        scores = self.reward_fn(response_str, ground_truth)
        reward_tensor = torch.zeros_like(data.batch["responses"], dtype=torch.float32)
        reward_metrics = defaultdict(list)
        for i, score in enumerate(scores):
            reward_tensor[i, response_length[i] - 1] = score["overall"]
chenych's avatar
chenych committed
116
117
            for key, value in score.items():
                reward_metrics[key].append(value)
chenych's avatar
chenych committed
118

chenych's avatar
chenych committed
119
        return reward_tensor, reward_metrics