_utils.py 11.9 KB
Newer Older
1
import copy
2
import math
3
from contextlib import nullcontext
4
from typing import Any, Callable, Dict, List, Optional
5

6
import torch
7
import torch.distributed as dist
8
9
10
from torch import Tensor
from torch import distributed as dist
from torch.distributed import ProcessGroup
11
from torch.nn import Module
12
from torch.optim import Adam, Optimizer
13
from torch.testing import assert_close
14

15
16
from colossalai.booster import Booster
from colossalai.booster.plugin import HybridParallelPlugin
17
from colossalai.booster.plugin.hybrid_parallel_plugin import HybridParallelModule
18
from colossalai.lazy import LazyInitContext
19
from colossalai.pipeline.stage_manager import PipelineStageManager
20
from colossalai.shardformer import ShardConfig, ShardFormer
21
from colossalai.shardformer._utils import getattr_
22
from colossalai.shardformer.policies.auto_policy import Policy
23
from colossalai.tensor.d_tensor.api import is_customized_distributed_tensor, is_distributed_tensor
24
25


26
27
28
29
30
31
32
33
34
def build_model(
    model_fn,
    enable_fused_normalization=True,
    enable_tensor_parallelism=True,
    enable_flash_attention=False,
    enable_jit_fused=False,
    enable_sequence_parallelism=False,
    use_lazy_init: bool = False,
):
35
    # create new model
36
37
38
39
40
41
42
    ctx = LazyInitContext() if use_lazy_init else nullcontext()
    with ctx:
        # create new model
        org_model = model_fn()
        model_copy = copy.deepcopy(org_model)
    if use_lazy_init:
        ctx.materialize(org_model)
43
    # shard model
44
45
46
47
48
49
50
    shard_config = ShardConfig(
        enable_fused_normalization=enable_fused_normalization,
        enable_tensor_parallelism=enable_tensor_parallelism,
        enable_flash_attention=enable_flash_attention,
        enable_jit_fused=enable_jit_fused,
        enable_sequence_parallelism=enable_sequence_parallelism,
    )
51
    model_copy = copy.deepcopy(org_model)
52
    shard_former = ShardFormer(shard_config=shard_config)
ver217's avatar
ver217 committed
53
    sharded_model, shared_params = shard_former.optimize(model_copy)
54
    return org_model.cuda(), sharded_model.cuda()
55
56


57
58
59
60
61
62
63
64
def build_pipeline_model(
    model_fn,
    stage_manager=None,
    enable_fused_normalization=False,
    enable_tensor_parallelism=False,
    use_lazy_init: bool = False,
    policy: Optional[Policy] = None,
):
65
66
67
68
69
70
71
72
73
    ctx = LazyInitContext() if use_lazy_init else nullcontext()
    with ctx:
        # create new model
        org_model = model_fn()
        model_copy = copy.deepcopy(org_model)
    if use_lazy_init:
        ctx.materialize(org_model)

    # shard model
74
75
76
77
78
    shard_config = ShardConfig(
        enable_fused_normalization=enable_fused_normalization,
        enable_tensor_parallelism=enable_tensor_parallelism,
        pipeline_stage_manager=stage_manager,
    )
Jianghai's avatar
Jianghai committed
79

80
    shard_former = ShardFormer(shard_config=shard_config)
Jianghai's avatar
Jianghai committed
81
    sharded_model, shared_params = shard_former.optimize(model_copy, policy=policy)
82
83
84
    return org_model.cuda(), sharded_model.cuda()


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def run_forward(original_model, sharded_model, data_gen_fn, output_transform_fn, loss_fn):
    # prepare input
    data = data_gen_fn()
    data = {k: v.cuda() for k, v in data.items()}
    # switch to train mode
    original_model.train()
    sharded_model.train()
    # run forward
    org_output = original_model(**data)
    org_output = output_transform_fn(org_output)
    org_loss = loss_fn(org_output)

    shard_output = sharded_model(**data)
    shard_output = output_transform_fn(shard_output)
    shard_loss = loss_fn(shard_output)
100
    return org_output, org_loss, shard_output, shard_loss
101
102


103
def check_state_dict(org_model: Module, sharded_model: Module, name: str = ""):
104
105
106
    org_sd = org_model.state_dict()
    shard_sd = sharded_model.state_dict()
    for k, v in org_sd.items():
107
        assert k in shard_sd, f"{name} {k} not in sharded model"
108
        shard_v = shard_sd[k]
109
110
111
        assert v.shape == shard_v.shape, f"{name} {k} shape mismatch, {v.shape} vs {shard_v.shape}"
        assert v.dtype == shard_v.dtype, f"{name} {k} dtype mismatch, {v.dtype} vs {shard_v.dtype}"
        assert torch.equal(v, shard_v), f"{name} {k} value mismatch"
112
113


114
115
def build_model_from_hybrid_plugin(model_fn: Callable, loss_fn: Callable, test_config: Dict[str, Any]):
    use_lazy_init = False
116
117
    if "use_lazy_init" in test_config:
        use_lazy_init = test_config.pop("use_lazy_init")
118

119
    ctx = LazyInitContext() if use_lazy_init else nullcontext()
120
    with ctx:
121
        org_model = model_fn()
122
123
        sharded_model = copy.deepcopy(org_model)
    if use_lazy_init:
124
        ctx.materialize(org_model)
125

126
    org_model = org_model.cuda()
127
128
129
130
    org_optimizer = Adam(org_model.parameters(), lr=1e-3)
    sharded_optimizer = Adam(sharded_model.parameters(), lr=1e-3)
    criterion = loss_fn

131
132
    plugin = HybridParallelPlugin(**test_config)
    booster = Booster(plugin=plugin)
133

134
    sharded_model, sharded_optimizer, criterion, _, _ = booster.boost(sharded_model, sharded_optimizer, criterion)
135
136
137
    return org_model, org_optimizer, sharded_model, sharded_optimizer, criterion, booster


138
139
140
141
142
143
144
145
146
def run_forward_backward_with_hybrid_plugin(
    org_model: Module,
    sharded_model: Module,
    sharded_optimizer: Optimizer,
    data_gen_fn: Callable,
    output_transform_fn: Callable,
    criterion: Callable,
    booster: Booster,
):
147
148
    org_model.cuda()
    sharded_model.cuda()
149
150
151
152
153
154
155

    def _criterion(outputs, inputs):
        outputs = output_transform_fn(outputs)
        loss = criterion(outputs)
        return loss

    data = data_gen_fn()
156
157

    if booster.plugin.enable_sequence_parallelism and booster.plugin.tp_size != 0:
158
        seq_len = data["input_ids"].shape[-1]
159
160
        lcm = booster.plugin.tp_size * seq_len // math.gcd(booster.plugin.tp_size, seq_len)
        times = lcm // seq_len
161
        input_shape = data["input_ids"].shape
162
163
        for k, v in data.items():
            if v.shape == input_shape:
164
                data[k] = v.repeat((1,) * (v.dim() - 1) + (times,))
165

166
167
    sharded_model.train()
    if booster.plugin.stage_manager is not None:
168
        for k, v in data.items():
169
            if torch.is_tensor(v) or "Tensor" in v.__class__.__name__:
170
171
                new_shape = [1] * v.dim()
                new_shape[0] = 4
172
                data[k] = v.to("cuda").repeat(*new_shape)
173

174
        data_iter = iter([data])
175
176
177
178
        sharded_output = booster.execute_pipeline(
            data_iter, sharded_model, _criterion, sharded_optimizer, return_loss=True, return_outputs=True
        )
        sharded_loss = sharded_output["loss"]
179
180
181
    else:
        data = {k: v.cuda() for k, v in data.items()}
        sharded_output = sharded_model(**data)
182

183
        sharded_loss = criterion(sharded_output)
184
        sharded_optimizer.backward(sharded_loss)
185
186

    org_model.train()
187
    data = {k: v.cuda() for k, v in data.items()}
188
    org_output = org_model(**data)
189

190
191
192
193
194
195
    org_loss = criterion(org_output)
    org_loss.backward()

    return org_loss, org_output, sharded_loss, sharded_output


196
197
198
199
200
201
202
203
def check_output_hidden_state(
    org_output: Tensor,
    sharded_output: Tensor,
    stage_manager: Optional[PipelineStageManager] = None,
    atol: float = 1e-5,
    rtol: float = 1e-3,
    dim: int = 0,
):
204
205
    org_hidden_state = org_output.last_hidden_state

206
    if stage_manager and stage_manager.is_last_stage(ignore_chunk=True):
207
        sharded_hidden_state = sharded_output["outputs"]["last_hidden_state"]
Jianghai's avatar
Jianghai committed
208
209
    else:
        sharded_hidden_state = sharded_output.last_hidden_state
210

211
    assert_close(org_hidden_state.float(), sharded_hidden_state.float(), atol=atol, rtol=rtol)
212
213
214


def check_loss(org_loss: Tensor, sharded_loss: Tensor, atol: float = 1e-5, rtol: float = 1e-3):
215
    assert torch.allclose(org_loss.float(), sharded_loss.float(), atol=atol, rtol=rtol)
216
217
218
219
220
221
222
223
224
225
226
227


def check_weight(
    org_model: Module,
    sharded_model: Module,
    layer_suffix: List[str],
    tp_group: Optional[ProcessGroup] = None,
    dim: int = 0,
    atol: float = 1e-5,
    rtol: float = 1e-3,
    verbose: bool = False,
):
228
229
230
231
    for suffix in layer_suffix:
        org_weight = getattr_(org_model, suffix).weight
        sharded_weight = getattr_(sharded_model, suffix).weight

232
233
234
235
        # skip if layer is not held by this process
        if sharded_weight is None:
            continue

236
237
        if is_distributed_tensor(sharded_weight) or is_customized_distributed_tensor(sharded_weight):
            sharded_weight_list = [
238
                torch.zeros_like(sharded_weight).to("cuda") for _ in range(dist.get_world_size(tp_group))
239
240
241
242
243
244
245
            ]
            dist.all_gather(sharded_weight_list, sharded_weight, tp_group)
            sharded_weight = torch.cat(sharded_weight_list, dim=dim)

        if verbose and dist.get_rank() == 0:
            print(f"'{suffix}' weight: {org_weight}, {sharded_weight}")

246
        assert_close(org_weight.float(), sharded_weight.float(), atol=atol, rtol=rtol)
247
248
249
250
251
252
253
254
255
256
257
258
259


def get_grad_tensors_for_check(
    org_model: Module,
    sharded_model: Module,
    layer_suffix: List[str],
    tp_group: ProcessGroup = None,
    dim: int = 0,
    atol: float = 1e-5,
    rtol: float = 1e-3,
    verbose: bool = False,
    name: str = None,
):
260
261
262
263
264
265
    grad_to_check = {}
    for suffix in layer_suffix:
        org_grad = getattr_(org_model, suffix).weight.grad
        shard_grad = getattr_(sharded_model, suffix).weight.grad
        shard_weight = getattr_(sharded_model, suffix).weight
        if is_distributed_tensor(shard_weight) or is_customized_distributed_tensor(shard_weight):
266
            shard_grad_list = [torch.zeros_like(shard_grad).to("cuda") for _ in range(dist.get_world_size(tp_group))]
267
268
269
270
271
            dist.all_gather(shard_grad_list, shard_grad, tp_group)
            shard_grad = torch.cat(shard_grad_list, dim=dim)

        # embedding may be resized when using tensor parallel
        if shard_grad.shape[0] > org_grad.shape[0]:
272
            shard_grad = shard_grad[: org_grad.shape[0], :]
273
274
275
276
277
278
279
        if verbose and dist.get_rank() == 0:
            print(f"'{suffix}' grad: {org_grad}, {shard_grad}")

        grad_to_check[suffix] = {
            "org_grad": org_grad.float(),
            "shard_grad": shard_grad.float(),
            "rtol": rtol,
280
            "atol": atol,
281
282
283
284
285
286
        }

    return grad_to_check


# used by sam/blip2
287
288
289
290
291
292
293
294
295
296
def check_grad(
    org_model: Module,
    sharded_model: Module,
    layer_suffix: List[str],
    tp_group: ProcessGroup = None,
    dim: int = 0,
    atol: float = 1e-5,
    rtol: float = 1e-3,
    verbose: bool = False,
):
297
    for suffix in layer_suffix:
298
        org_grad = getattr_(org_model, suffix).weight.grad
299
300
301
        shard_grad = getattr_(sharded_model, suffix).weight.grad
        shard_weight = getattr_(sharded_model, suffix).weight
        if is_distributed_tensor(shard_weight) or is_customized_distributed_tensor(shard_weight):
302
            shard_grad_list = [torch.zeros_like(shard_grad).to("cuda") for _ in range(dist.get_world_size(tp_group))]
303
304
305
306
307
            dist.all_gather(shard_grad_list, shard_grad, tp_group)
            shard_grad = torch.cat(shard_grad_list, dim=dim)

        # embedding may be resized when using tensor parallel
        if shard_grad.shape[0] > org_grad.shape[0]:
308
            shard_grad = shard_grad[: org_grad.shape[0], :]
309
        if verbose and dist.get_rank() == 0:
310
            print(f"'{suffix}' grad: {org_grad}, {shard_grad}")
311

312
        assert_close(org_grad.float(), shard_grad.float(), rtol=rtol, atol=atol)
313
314


315
316
317
def unwrap_model(
    module: Module, base_model_class_name: Optional[str] = None, base_model_attribute_name: Optional[str] = None
):
318
319
320
321
322
323
324
    if isinstance(module, HybridParallelModule):
        module = module.unwrap()
    if base_model_class_name is None:
        return module
    if module.__class__.__name__ == base_model_class_name:
        return module
    return getattr(module, base_model_attribute_name, None)
325
326
327
328
329
330
331
332
333
334
335
336


def check_all_grad_tensors(check_tensors):
    """
    "org_grad": tensor to be compared from the original model
    "shard_grad": tensor to be compared from the sharded model
    """
    for suffix, check_info in check_tensors.items():
        org_grad = check_info["org_grad"]
        shard_grad = check_info["shard_grad"]
        rtol = check_info["rtol"]
        atol = check_info["atol"]
337
        assert_close(org_grad, shard_grad, atol=atol, rtol=rtol)