lora_constructor.py 4.08 KB
Newer Older
1
2
from collections import OrderedDict
from dataclasses import dataclass
3
from typing import Any, Dict
4
5
6
7
8
9
10
11
12
13
14
15
16
17

import torch.nn as nn
from coati.models.lora import LoraLinear


@dataclass
class LoRAConfig:
    r: int = 0
    lora_alpha: int = 1
    lora_dropout: float = 0
    fan_in_fan_out: bool = False


class LoRAConstructor:
18
    """
19
    Tools for reconstructing a model from a remote LoRA model.
20
    (Transferring only LoRA data costs much less!)
21
22
23
    Usage:
        Step 1 (Sender):
            filter_state_dict_lora()
24

25
26
        Step 2 (Sender, Optional):
            extract_lora_config()
27

28
29
        Step 3 (Sender):
            send state_dict_lora and lora_config_dict
30

31
32
        Step 4 (Receiver):
            reconstruct_increase()
33

34
35
        Step 5 (Receiver):
            load_state_dict_increase()
36

37
    """
38
39
40
41
42
43
44
45

    def __init__(self):
        self.lora_config_dict = None

    def register_lora_config(self, lora_config_dict: Dict[str, Any]):
        self.lora_config_dict = lora_config_dict

    def reconstruct_increase(self, state_dict_lora: Dict[str, Any], lora_config_dict: Dict[str, Any]):
46
47
48
49
        """
        xxx.lora_A, xxx.lora_B -->> xxx.weight
        Warning: the xxx.weight here is the increment actually.
        """
50
51
52
        if lora_config_dict is not None:
            self.register_lora_config(lora_config_dict)

53
        state_dict_increase = OrderedDict()
54
55
56
        config_iter = iter(self.lora_config_dict.items())
        lora_A, lora_B, layer_prefix = None, None, None
        for k, v in state_dict_lora.items():
57
            if k.rpartition(".")[-1] == "lora_A":
58
                lora_A = v
59
60
61
                layer_prefix = k.rpartition(".")[0]
            elif k.rpartition(".")[-1] == "lora_B":
                assert layer_prefix == k.rpartition(".")[0], "unmatched (lora_A, lora_B) pair"
62
63
64
65
                layer_prefix_2, config = next(config_iter)
                assert layer_prefix_2 == layer_prefix, "unmatched (state_dict, config_dict) pair"
                lora_B = v
                weight_data_increase = self._compute(lora_A, lora_B, config)
66
                state_dict_increase[layer_prefix + ".weight"] = weight_data_increase
67
68
                lora_A, lora_B, layer_prefix = None, None, None
            else:
69
                raise ValueError("unexpected key")
70
        return state_dict_increase
71
72
73
74

    def _compute(self, lora_A, lora_B, config=LoRAConfig()):
        def T(w):
            return w.T if config.fan_in_fan_out else w
75

76
77
78
79
80
81
        if config.r > 0:
            scaling = config.lora_alpha / config.r
            weight_data_increase = T(lora_B @ lora_A) * scaling
            return weight_data_increase
        return 0

82
    def load_state_dict_increase(self, model: nn.Module, state_dict_increase: Dict[str, Any]):
83
        """
84
        The final reconstruction step
85
        """
86
        # naive approach
87
        model.load_state_dict({k: v + model.state_dict()[k] for k, v in state_dict_increase.items()}, strict=False)
88
89
90

    @staticmethod
    def filter_state_dict_lora(state_dict: Dict[str, Any], keep_non_lora=False):
91
        """
92
        if keep_non_lora, also return non_lora state_dict
93
        """
94
95
96
        state_dict_lora = OrderedDict()
        state_dict_non_lora = OrderedDict()
        for k, v in state_dict.items():
97
            if "lora_A" in k or "lora_B" in k:
98
99
100
101
102
103
104
105
106
107
                state_dict_lora[k] = v
            elif keep_non_lora:
                state_dict_non_lora[k] = v
        if keep_non_lora:
            return state_dict_lora, state_dict_non_lora
        else:
            return state_dict_lora, None

    @staticmethod
    def extract_lora_config(model: nn.Module) -> Dict[str, LoRAConfig]:
108
        """
109
110
        extract LoraLinear model.
        return OrderedDict(): name -> LoRAConfig
111
        """
112
113
114
115
        lora_config_dict = OrderedDict()

        for name, child in model.named_modules():
            if isinstance(child, LoraLinear):
116
117
118
119
120
121
                lora_config_dict[name] = LoRAConfig(
                    r=child.r,
                    lora_alpha=child.lora_alpha,
                    lora_dropout=child.lora_dropout,
                    fan_in_fan_out=child.fan_in_fan_out,
                )
122
123

        return lora_config_dict