easy_models.py 3.28 KB
Newer Older
1
2
3
4
5
6
from typing import Optional, Tuple, Union

import torch
import torch.nn as nn
import torch.nn.functional as F
from coati.models.generation import generate
7
from coati.models.utils import log_probs_from_logits
8
from peft import PeftModel
9
10
11
from torch.nn.modules import Module
from transformers import BloomConfig, BloomForCausalLM

12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

class Actor(Module):
    """
    Actor model base class.

    Args:
        model (nn.Module): Actor Model.
    """

    def __init__(self, model: nn.Module) -> None:
        super().__init__()
        self.model = model

    @torch.no_grad()
    def generate(
27
        self, input_ids: torch.Tensor, return_action_mask: bool = True, **kwargs
28
29
30
    ) -> Union[Tuple[torch.LongTensor, torch.LongTensor], Tuple[torch.LongTensor, torch.LongTensor, torch.BoolTensor]]:
        sequences = generate(self.model, input_ids, **kwargs)
        attention_mask = None
31
        pad_token_id = kwargs.get("pad_token_id", None)
32
33
34
35
36
        if pad_token_id is not None:
            attention_mask = sequences.not_equal(pad_token_id).to(dtype=torch.long, device=sequences.device)
        if not return_action_mask:
            return sequences, attention_mask, None
        input_len = input_ids.size(1)
37
        eos_token_id = kwargs.get("eos_token_id", None)
38
39
40
41
42
        if eos_token_id is None:
            action_mask = torch.ones_like(sequences, dtype=torch.bool)
        else:
            # left padding may be applied, only mask action
            action_mask = (sequences[:, input_len:] == eos_token_id).cumsum(dim=-1) == 0
43
            action_mask = F.pad(action_mask, (1 + input_len, -1), value=True)  # include eos token and input
44
45
        action_mask[:, :input_len] = False
        action_mask = action_mask[:, 1:]
46
        return sequences, attention_mask, action_mask[:, -(sequences.size(1) - input_len) :]
47

48
49
50
51
    def forward(
        self, sequences: torch.LongTensor, num_actions: int, attention_mask: Optional[torch.Tensor] = None
    ) -> torch.Tensor:
        """Returns action log probs"""
52
        output = self.model(sequences, attention_mask=attention_mask)
53
        logits = output["logits"]
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
        log_probs = log_probs_from_logits(logits[:, :-1, :], sequences[:, 1:])
        return log_probs[:, -num_actions:]

    def get_base_model(self):
        return self.model


class BLOOMActor(Actor):
    """
    BLOOM Actor model.

    Args:
        pretrained (str): Pretrained model name or path.
        config (BloomConfig): Model config.
        checkpoint (bool): Enable gradient checkpointing.
        lora_rank (int): LoRA rank.
        lora_train_bias (str): LoRA bias training mode.
    """

73
74
75
76
77
78
79
    def __init__(
        self,
        pretrained: str = None,
        config: Optional[BloomConfig] = None,
        checkpoint: bool = False,
        lora_path: str = None,
    ) -> None:
80
81
82
83
84
85
86
        if pretrained is not None:
            model = BloomForCausalLM.from_pretrained(pretrained)
        elif config is not None:
            model = BloomForCausalLM(config)
        else:
            model = BloomForCausalLM(BloomConfig())
        if lora_path is not None:
87
            model = PeftModel.from_pretrained(model, lora_path)
88
89
90
        if checkpoint:
            model.gradient_checkpointing_enable()
        super().__init__(model)
91

92
93
    def print_trainable_parameters(self):
        self.get_base_model().print_trainable_parameters()