_utils.py 2.44 KB
Newer Older
1
2
import os
import random
3

4
import numpy as np
Ziyue Jiang's avatar
Ziyue Jiang committed
5
6
import torch
import torch.distributed as dist
7

8
from colossalai.context import ParallelMode
9
10
from colossalai.core import global_context as gpc
from colossalai.tensor import ComputePattern, ComputeSpec, ShardSpec
11
12
13
14
15
16
17
18
19


def set_seed(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
20
    torch.backends.cudnn.benchmark = False
Ziyue Jiang's avatar
Ziyue Jiang committed
21

ver217's avatar
ver217 committed
22

Ziyue Jiang's avatar
Ziyue Jiang committed
23
24
25
def check_equal(A, B):
    assert torch.allclose(A, B, rtol=1e-3, atol=1e-1) == True

ver217's avatar
ver217 committed
26

Ziyue Jiang's avatar
Ziyue Jiang committed
27
28
29
30
31
32
33
34
35
36
def replace_parameter_add_grad(layer, weight=None, bias=None):
    if weight is not None:
        delattr(layer, 'weight')
        setattr(layer, 'weight', weight)
        layer.weight.requires_grad = True
    if bias is not None:
        delattr(layer, 'bias')
        setattr(layer, 'bias', bias)
        layer.bias.requires_grad = True

ver217's avatar
ver217 committed
37

Ziyue Jiang's avatar
Ziyue Jiang committed
38
39
40
def broadcast_tensor_chunk(tensor, chunk_size=1, local_rank=0):
    dist.broadcast(tensor, src=0)
    tensor_chunk = torch.chunk(tensor, chunk_size, dim=-1)[local_rank]
ver217's avatar
ver217 committed
41
42
43
44
45
    return tensor_chunk.clone()


def tensor_equal(A, B):
    return torch.allclose(A, B, rtol=1e-3, atol=1e-1)
46
47


48
def tensor_shard_equal(tensor: torch.Tensor, shard: torch.Tensor, rank, world_size):
49
50
51
52
53
54
55
56
    assert tensor.ndim == shard.ndim
    if tensor.shape == shard.shape:
        return tensor_equal(tensor, shard)
    else:
        dims_not_eq = torch.nonzero(torch.tensor(tensor.shape) != torch.tensor(shard.shape))
        if dims_not_eq.numel() == 1:
            # 1D shard
            dim = dims_not_eq.item()
57
58
59
60
            if world_size is None:
                world_size = gpc.get_world_size(ParallelMode.PARALLEL_1D)
            if rank is None:
                rank = gpc.get_local_rank(ParallelMode.PARALLEL_1D)
61
62
63
            return tensor_equal(tensor.chunk(world_size, dim)[rank], shard)
        else:
            raise NotImplementedError
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78


def split_param_single_dim_tp1d(dim, param, pg):
    spec = (ShardSpec([dim], [pg.tp_world_size()]), ComputeSpec(ComputePattern.TP1D))
    if param.process_group.tp_world_size() == 1:
        param.set_process_group(pg)
    param.set_tensor_spec(*spec)


def split_param_row_tp1d(param, pg):
    split_param_single_dim_tp1d(0, param, pg)


def split_param_col_tp1d(param, pg):
    split_param_single_dim_tp1d(-1, param, pg)
79
80
81
82
83
84


def debug_print(ranks, *args):
    if dist.get_rank() in ranks:
        print(*args)
    dist.barrier()