rpc_test_utils.py 4.19 KB
Newer Older
1
2
import os
import argparse
3
import warnings
4
5
6
7
8
9

import torch
from torch import nn
import torch.multiprocessing as mp
import torch.distributed.rpc as rpc
from torch.optim import SGD, Adam, RMSprop, Optimizer
10
from torch._C._distributed_rpc import _is_current_rpc_agent_set
11
import torch.distributed as dist
12
13
from colorama import Back, Style

14
15
16
17
from colossalai.pipeline.pipeline_process_group import ppg
from colossalai.logging import disable_existing_loggers
from colossalai import launch

18
19
rpc_is_initialized = _is_current_rpc_agent_set

20
21
22
23
24
25
26
27
28
29
30
31
32

def color_debug(text, prefix=' ', color='blue'):
    color = color.upper()
    print(getattr(Back, color), prefix, Style.RESET_ALL, text)


class RpcTestModel(nn.Module):

    def __init__(self, stage_id, actual_stage_num, feat_num, h) -> None:
        super().__init__()
        self.rank = stage_id
        self.is_last_rank = stage_id == actual_stage_num - 1
        self.linear_name = f'linear_{stage_id}'
33

34
        if stage_id == 0:
35
            linear = nn.Linear(feat_num, h)
36
        elif stage_id == actual_stage_num - 1:
37
            linear = nn.Linear(h, 1)
38
        else:
39
40
41
            linear = nn.Linear(h, h)

        setattr(self, self.linear_name, linear)
42
43
44
45
46
47
48
49
50
51
52
53

    def forward(self, x) -> torch.Tensor:
        linear: nn.Module = getattr(self, self.linear_name)
        out: torch.Tensor = linear(x)

        if self.is_last_rank:
            out = out.sum()
        return out


def parse_args():
    parser = argparse.ArgumentParser()
54
    parser.add_argument('--epoch', type=int, default=1)
55
    parser.add_argument('--world_size', type=int, default=2)
56
    parser.add_argument('--batch_size', type=int, default=16)
57
58
    parser.add_argument('--dp_degree', type=int, default=1)
    parser.add_argument('--tp_degree', type=int, default=1)
59
60
61
62
    parser.add_argument('--num_microbatches', type=int, default=2)
    parser.add_argument('--chunk', type=int, default=1)
    parser.add_argument('--use_checkpoint', action='store_true')
    parser.add_argument('--optimizer', type=str, choices=['SGD', 'Adam', 'RMSprop'], default='SGD')
63
    parser.add_argument('--device', type=str, choices=['cpu', 'cuda'], default='cuda')
64
65
66
67
68
69
    parser.add_argument('--master_addr', type=str, default='localhost')
    parser.add_argument('--master_port', type=str, default='29020')
    parser.add_argument('--num_worker_threads', type=str, default=128)
    return parser.parse_args()


70
71
72
73
74
75
76
77
78
79
80
81
82
def pg_parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--world_size', type=int, default=4)
    parser.add_argument('--dp_degree', type=int, default=2)
    parser.add_argument('--tp_degree', type=int, default=1)
    parser.add_argument('--chunk', type=int, default=1)
    parser.add_argument('--num_worker_threads', type=str, default=128)
    parser.add_argument('--device', type=str, choices=['cpu', 'cuda'], default='cuda')
    parser.add_argument('--master_addr', type=str, default='localhost')
    parser.add_argument('--master_port', type=str, default='29020')
    return parser.parse_args()


83
84
85
86
def run_worker(rank, args, master_func):
    os.environ['MASTER_ADDR'] = args.master_addr
    os.environ['MASTER_PORT'] = args.master_port

87
    device = args.device
88
    world_size = args.world_size
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
    dp_degree = args.dp_degree
    tp_degree = args.tp_degree
    num_worker_threads = args.num_worker_threads
    host = args.master_addr
    port = args.master_port
    backend = 'nccl' if device == 'cuda' else 'gloo'

    disable_existing_loggers()

    launch(dict(), rank, world_size, host, int(port), backend, verbose=False)
    ppg.set_global_info(rank=rank,
                        world_size=world_size,
                        dp_degree=dp_degree,
                        tp_degree=tp_degree,
                        num_worker_threads=num_worker_threads,
                        device=device)
105
106
107
108
109

    # in rpc mode, only rank 0 is needed to be coded
    if rank == 0:
        master_func(args)
    # barrier here
110
111
112
113
    if rpc_is_initialized():
        rpc.shutdown()
    else:
        warnings.warn("RPC has not been initialized")
114
115
116
117
118
119


def rpc_run(args, master_func):
    world_size = args.world_size
    assert args.num_microbatches >= args.world_size, "num_microbatches cannot be fewer than world_size!"
    mp.spawn(run_worker, args=(args, master_func), nprocs=world_size)