_utils.py 12.2 KB
Newer Older
1
import copy
2
from contextlib import nullcontext
3
from typing import Any, Callable, Dict, List, Optional
4

5
import torch
6
import torch.distributed as dist
7
8
9
from torch import Tensor
from torch import distributed as dist
from torch.distributed import ProcessGroup
10
from torch.nn import Module
11
from torch.optim import Adam, Optimizer
12
from torch.testing import assert_close
13

14
15
from colossalai.booster import Booster
from colossalai.booster.plugin import HybridParallelPlugin
16
from colossalai.booster.plugin.hybrid_parallel_plugin import HybridParallelModule
17
from colossalai.checkpoint_io.utils import gather_distributed_param
18
from colossalai.lazy import LazyInitContext
19
from colossalai.pipeline.stage_manager import PipelineStageManager
20
from colossalai.shardformer import ShardConfig, ShardFormer
21
from colossalai.shardformer._utils import getattr_
22
from colossalai.shardformer.policies.auto_policy import Policy
23
from colossalai.tensor.d_tensor.api import is_customized_distributed_tensor, is_distributed_tensor
24
from colossalai.tensor.padded_tensor.api import is_padded_tensor, to_unpadded_tensor
25
26


27
28
29
30
31
32
33
34
def build_model(
    model_fn,
    enable_fused_normalization=True,
    enable_tensor_parallelism=True,
    enable_flash_attention=False,
    enable_jit_fused=False,
    enable_sequence_parallelism=False,
    use_lazy_init: bool = False,
35
    dtype=torch.float32,
36
):
37
    # create new model
38
39
40
41
42
43
44
    ctx = LazyInitContext() if use_lazy_init else nullcontext()
    with ctx:
        # create new model
        org_model = model_fn()
        model_copy = copy.deepcopy(org_model)
    if use_lazy_init:
        ctx.materialize(org_model)
45
    # shard model
46
47
48
49
50
51
52
    shard_config = ShardConfig(
        enable_fused_normalization=enable_fused_normalization,
        enable_tensor_parallelism=enable_tensor_parallelism,
        enable_flash_attention=enable_flash_attention,
        enable_jit_fused=enable_jit_fused,
        enable_sequence_parallelism=enable_sequence_parallelism,
    )
53
    model_copy = copy.deepcopy(org_model)
54
    shard_former = ShardFormer(shard_config=shard_config)
ver217's avatar
ver217 committed
55
    sharded_model, shared_params = shard_former.optimize(model_copy)
56
    return org_model.cuda().to(dtype), sharded_model.cuda().to(dtype)
57
58


59
60
61
62
63
64
65
66
def build_pipeline_model(
    model_fn,
    stage_manager=None,
    enable_fused_normalization=False,
    enable_tensor_parallelism=False,
    use_lazy_init: bool = False,
    policy: Optional[Policy] = None,
):
67
68
69
70
71
72
73
74
75
    ctx = LazyInitContext() if use_lazy_init else nullcontext()
    with ctx:
        # create new model
        org_model = model_fn()
        model_copy = copy.deepcopy(org_model)
    if use_lazy_init:
        ctx.materialize(org_model)

    # shard model
76
77
78
79
80
    shard_config = ShardConfig(
        enable_fused_normalization=enable_fused_normalization,
        enable_tensor_parallelism=enable_tensor_parallelism,
        pipeline_stage_manager=stage_manager,
    )
Jianghai's avatar
Jianghai committed
81

82
    shard_former = ShardFormer(shard_config=shard_config)
Jianghai's avatar
Jianghai committed
83
    sharded_model, shared_params = shard_former.optimize(model_copy, policy=policy)
84
85
86
    return org_model.cuda(), sharded_model.cuda()


87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
def run_forward(original_model, sharded_model, data_gen_fn, output_transform_fn, loss_fn):
    # prepare input
    data = data_gen_fn()
    data = {k: v.cuda() for k, v in data.items()}
    # switch to train mode
    original_model.train()
    sharded_model.train()
    # run forward
    org_output = original_model(**data)
    org_output = output_transform_fn(org_output)
    org_loss = loss_fn(org_output)

    shard_output = sharded_model(**data)
    shard_output = output_transform_fn(shard_output)
    shard_loss = loss_fn(shard_output)
102
    return org_output, org_loss, shard_output, shard_loss
103
104


105
def check_state_dict(org_model: Module, sharded_model: Module, name: str = ""):
106
107
108
    org_sd = org_model.state_dict()
    shard_sd = sharded_model.state_dict()
    for k, v in org_sd.items():
109
        assert k in shard_sd, f"{name} {k} not in sharded model"
110
        shard_v = shard_sd[k]
111
112
113
        assert v.shape == shard_v.shape, f"{name} {k} shape mismatch, {v.shape} vs {shard_v.shape}"
        assert v.dtype == shard_v.dtype, f"{name} {k} dtype mismatch, {v.dtype} vs {shard_v.dtype}"
        assert torch.equal(v, shard_v), f"{name} {k} value mismatch"
114
115


116
117
def build_model_from_hybrid_plugin(model_fn: Callable, loss_fn: Callable, test_config: Dict[str, Any]):
    use_lazy_init = False
118
119
    if "use_lazy_init" in test_config:
        use_lazy_init = test_config.pop("use_lazy_init")
120

121
    ctx = LazyInitContext() if use_lazy_init else nullcontext()
122
    with ctx:
123
        org_model = model_fn()
124
125
        sharded_model = copy.deepcopy(org_model)
    if use_lazy_init:
126
127
        ctx.materialize(org_model)
    org_model = org_model.cuda()
128
129
130
131
    org_optimizer = Adam(org_model.parameters(), lr=1e-3)
    sharded_optimizer = Adam(sharded_model.parameters(), lr=1e-3)
    criterion = loss_fn

132
133
    plugin = HybridParallelPlugin(**test_config)
    booster = Booster(plugin=plugin)
134

135
    sharded_model, sharded_optimizer, criterion, _, _ = booster.boost(sharded_model, sharded_optimizer, criterion)
136
137
138
139
140
141
142
143
    return (
        org_model,
        org_optimizer,
        sharded_model,
        sharded_optimizer,
        criterion,
        booster,
    )
144
145


146
147
148
149
150
151
152
153
154
def run_forward_backward_with_hybrid_plugin(
    org_model: Module,
    sharded_model: Module,
    sharded_optimizer: Optimizer,
    data_gen_fn: Callable,
    output_transform_fn: Callable,
    criterion: Callable,
    booster: Booster,
):
155
156
    org_model.cuda()
    sharded_model.cuda()
157
158
159
160
161
162
163

    def _criterion(outputs, inputs):
        outputs = output_transform_fn(outputs)
        loss = criterion(outputs)
        return loss

    data = data_gen_fn()
164

165
166
167
168
169
170
    shard_test_data = {}
    for k, v in data.items():
        shard_test_data[k] = data[k].clone()
    unshard_test_data = {}
    for k, v in data.items():
        unshard_test_data[k] = data[k].clone()
171

172
173
    sharded_model.train()
    if booster.plugin.stage_manager is not None:
174
        for k, v in shard_test_data.items():
175
            if torch.is_tensor(v) or "Tensor" in v.__class__.__name__:
176
177
                new_shape = [1] * v.dim()
                new_shape[0] = 4
178
                shard_test_data[k] = v.to("cuda").repeat(*new_shape)
179

180
        data_iter = iter([shard_test_data])
181
        sharded_output = booster.execute_pipeline(
182
183
184
185
186
187
            data_iter,
            sharded_model,
            _criterion,
            sharded_optimizer,
            return_loss=True,
            return_outputs=True,
188
189
        )
        sharded_loss = sharded_output["loss"]
190

191
192
193
    else:
        shard_test_data = {k: v.cuda() for k, v in shard_test_data.items()}
        sharded_output = sharded_model(**shard_test_data)
194
        sharded_loss = criterion(sharded_output)
195
        sharded_optimizer.backward(sharded_loss)
196
197

    org_model.train()
198
199
200
201
202
203
204
205
    if booster.plugin.stage_manager is not None:
        for k, v in unshard_test_data.items():
            if torch.is_tensor(v) or "Tensor" in v.__class__.__name__:
                new_shape = [1] * v.dim()
                new_shape[0] = 4
                unshard_test_data[k] = v.to("cuda").repeat(*new_shape)
    unshard_test_data = {k: v.cuda() for k, v in unshard_test_data.items()}
    org_output = org_model(**unshard_test_data)
206
207
208
209
210
211
    org_loss = criterion(org_output)
    org_loss.backward()

    return org_loss, org_output, sharded_loss, sharded_output


212
213
214
215
216
217
218
def check_output_hidden_state(
    org_output: Tensor,
    sharded_output: Tensor,
    stage_manager: Optional[PipelineStageManager] = None,
    atol: float = 1e-5,
    rtol: float = 1e-3,
):
219
220
    org_hidden_state = org_output.last_hidden_state

221
    if stage_manager and stage_manager.is_last_stage(ignore_chunk=True):
222
        sharded_hidden_state = sharded_output["outputs"]["last_hidden_state"]
Jianghai's avatar
Jianghai committed
223
224
    else:
        sharded_hidden_state = sharded_output.last_hidden_state
225

226
    assert_close(org_hidden_state.float(), sharded_hidden_state.float(), atol=atol, rtol=rtol)
227
228
229


def check_loss(org_loss: Tensor, sharded_loss: Tensor, atol: float = 1e-5, rtol: float = 1e-3):
230
    assert torch.allclose(org_loss.float(), sharded_loss.float(), atol=atol, rtol=rtol)
231
232
233
234
235
236
237
238
239
240
241
242


def check_weight(
    org_model: Module,
    sharded_model: Module,
    layer_suffix: List[str],
    tp_group: Optional[ProcessGroup] = None,
    dim: int = 0,
    atol: float = 1e-5,
    rtol: float = 1e-3,
    verbose: bool = False,
):
243
244
245
246
    for suffix in layer_suffix:
        org_weight = getattr_(org_model, suffix).weight
        sharded_weight = getattr_(sharded_model, suffix).weight

247
248
249
250
        # skip if layer is not held by this process
        if sharded_weight is None:
            continue

251
        if is_distributed_tensor(sharded_weight) or is_customized_distributed_tensor(sharded_weight):
252
253
254
255
            sharded_weight = gather_distributed_param(sharded_weight, keep_vars=False)

        if is_padded_tensor(sharded_weight):
            sharded_weight = to_unpadded_tensor(sharded_weight)
256
257
258
259

        if verbose and dist.get_rank() == 0:
            print(f"'{suffix}' weight: {org_weight}, {sharded_weight}")

260
        assert_close(org_weight.float(), sharded_weight.float(), atol=atol, rtol=rtol)
261
262
263
264
265
266
267
268
269
270
271
272
273


def get_grad_tensors_for_check(
    org_model: Module,
    sharded_model: Module,
    layer_suffix: List[str],
    tp_group: ProcessGroup = None,
    dim: int = 0,
    atol: float = 1e-5,
    rtol: float = 1e-3,
    verbose: bool = False,
    name: str = None,
):
274
275
276
277
278
279
    grad_to_check = {}
    for suffix in layer_suffix:
        org_grad = getattr_(org_model, suffix).weight.grad
        shard_grad = getattr_(sharded_model, suffix).weight.grad
        shard_weight = getattr_(sharded_model, suffix).weight
        if is_distributed_tensor(shard_weight) or is_customized_distributed_tensor(shard_weight):
280
            shard_grad_list = [torch.zeros_like(shard_grad).to("cuda") for _ in range(dist.get_world_size(tp_group))]
281
282
283
284
285
            dist.all_gather(shard_grad_list, shard_grad, tp_group)
            shard_grad = torch.cat(shard_grad_list, dim=dim)

        # embedding may be resized when using tensor parallel
        if shard_grad.shape[0] > org_grad.shape[0]:
286
            shard_grad = shard_grad[: org_grad.shape[0], :]
287
288
289
290
291
292
293
        if verbose and dist.get_rank() == 0:
            print(f"'{suffix}' grad: {org_grad}, {shard_grad}")

        grad_to_check[suffix] = {
            "org_grad": org_grad.float(),
            "shard_grad": shard_grad.float(),
            "rtol": rtol,
294
            "atol": atol,
295
296
297
298
299
300
        }

    return grad_to_check


# used by sam/blip2
301
302
303
304
305
306
307
308
309
310
def check_grad(
    org_model: Module,
    sharded_model: Module,
    layer_suffix: List[str],
    tp_group: ProcessGroup = None,
    dim: int = 0,
    atol: float = 1e-5,
    rtol: float = 1e-3,
    verbose: bool = False,
):
311
    for suffix in layer_suffix:
312
        org_grad = getattr_(org_model, suffix).weight.grad
313
314
315
        shard_grad = getattr_(sharded_model, suffix).weight.grad
        shard_weight = getattr_(sharded_model, suffix).weight
        if is_distributed_tensor(shard_weight) or is_customized_distributed_tensor(shard_weight):
316
            shard_grad_list = [torch.zeros_like(shard_grad).to("cuda") for _ in range(dist.get_world_size(tp_group))]
317
318
319
320
321
            dist.all_gather(shard_grad_list, shard_grad, tp_group)
            shard_grad = torch.cat(shard_grad_list, dim=dim)

        # embedding may be resized when using tensor parallel
        if shard_grad.shape[0] > org_grad.shape[0]:
322
            shard_grad = shard_grad[: org_grad.shape[0], :]
323
        if verbose and dist.get_rank() == 0:
324
            print(f"'{suffix}' grad: {org_grad}, {shard_grad}")
325

326
        assert_close(org_grad.float(), shard_grad.float(), rtol=rtol, atol=atol)
327
328


329
def unwrap_model(
330
331
332
    module: Module,
    base_model_class_name: Optional[str] = None,
    base_model_attribute_name: Optional[str] = None,
333
):
334
335
336
337
338
339
340
    if isinstance(module, HybridParallelModule):
        module = module.unwrap()
    if base_model_class_name is None:
        return module
    if module.__class__.__name__ == base_model_class_name:
        return module
    return getattr(module, base_model_attribute_name, None)
341
342
343
344
345
346
347
348
349
350
351
352


def check_all_grad_tensors(check_tensors):
    """
    "org_grad": tensor to be compared from the original model
    "shard_grad": tensor to be compared from the sharded model
    """
    for suffix, check_info in check_tensors.items():
        org_grad = check_info["org_grad"]
        shard_grad = check_info["shard_grad"]
        rtol = check_info["rtol"]
        atol = check_info["atol"]
353
        assert_close(org_grad, shard_grad, atol=atol, rtol=rtol)