"applications/Chat/coati/models/base/__init__.py" did not exist on "b429529365ba15a4a72f6c6ba0f6556d9d9d1fe4"
test_middleware_1f1b.py 4.58 KB
Newer Older
Ziyue Jiang's avatar
Ziyue Jiang committed
1
import torch
2
3
4
5
import pytest
import os
import torch.multiprocessing as mp
import torch.distributed.rpc as rpc
Ziyue Jiang's avatar
Ziyue Jiang committed
6

7
8
9
10
11
from torch import nn
from torch._C._distributed_rpc import _is_current_rpc_agent_set
from colossalai import launch
from colossalai.logging import disable_existing_loggers
from colossalai.pipeline.pipeline_process_group import ppg
Ziyue Jiang's avatar
Ziyue Jiang committed
12
13
14
from colossalai.pipeline.rpc._pipeline_schedule import OneFOneBPipelineEngine
from colossalai.fx.passes.adding_split_node_pass import split_with_split_nodes_pass, balanced_split_pass
from colossalai.fx import ColoTracer
15
from colossalai.pipeline.middleware.adaptor import get_fx_topology
16
from rpc_test_utils import MLP, DAG_MLP
Ziyue Jiang's avatar
Ziyue Jiang committed
17
from functools import partial
18
from colossalai.testing import parameterize, rerun_if_address_is_in_use
Ziyue Jiang's avatar
Ziyue Jiang committed
19
20
21
22

# global variable for model created
batch_size = 16
dim = 10
23
rpc_is_initialized = _is_current_rpc_agent_set
Ziyue Jiang's avatar
Ziyue Jiang committed
24
25
26
27
28
29
30
31

def create_partition_module(pp_rank: int, stage_num: int, model, data_kwargs):
    model.eval()
    tracer = ColoTracer()
    meta_args = {k: v.to('meta') for k, v in data_kwargs.items()}
    graph = tracer.trace(root=model, meta_args=meta_args)
    gm = torch.fx.GraphModule(model, graph, model.__class__.__name__)
    annotated_model = balanced_split_pass(gm, stage_num)
32
33
34
35
36
37
    top_module, split_submodules = split_with_split_nodes_pass(annotated_model, merge_output=True)
    topo = get_fx_topology(top_module)
    for submodule in split_submodules:
        if isinstance(submodule, torch.fx.GraphModule):
            setattr(submodule, '_topo', topo)
    return split_submodules[pp_rank+1]
Ziyue Jiang's avatar
Ziyue Jiang committed
38

39
def partition(model, data_kwargs: dict, pp_rank: int, chunk: int, stage_num: int):
Ziyue Jiang's avatar
Ziyue Jiang committed
40
41
42
43
    torch.manual_seed(1024)
    partition = create_partition_module(pp_rank, stage_num, model, data_kwargs)
    return partition

44
def run_master(model_cls, world_size, forward_only):
Ziyue Jiang's avatar
Ziyue Jiang committed
45
46
    torch.manual_seed(100)

47
    epoch = 3
48
49
50
51
52
    device = 'cuda'
    stage_num = world_size
    chunk = 1
    num_microbatches = 8
    use_checkpoint = 'store_true'
Ziyue Jiang's avatar
Ziyue Jiang committed
53
    
54
55
56
57
58
59
    if model_cls == MLP:
        def data_gen():
            x = torch.zeros((batch_size, dim))
            kwargs = dict(x=x)
            return kwargs
        model = model_cls(dim, stage_num * 3)
60
61
62
63
        if forward_only:
            labels = None
        else:
            labels = 1
64
65
66
67
68
69
70
    elif model_cls == DAG_MLP:
        def data_gen():
            x = torch.zeros((batch_size, dim))
            y = torch.zeros((batch_size, dim))
            kwargs = dict(x=x, y=y)
            return kwargs
        model = model_cls(dim, stage_num * 3)
71
72
73
74
        if forward_only:
            labels = None
        else:
            labels = 1
75
76
    else:
        pass
Ziyue Jiang's avatar
Ziyue Jiang committed
77
78
    
    data_kwargs = data_gen()
79
    
80
    engine = OneFOneBPipelineEngine(partition_fn=partial(partition, model, data_kwargs),
Ziyue Jiang's avatar
Ziyue Jiang committed
81
82
83
84
                                    stage_num=stage_num,
                                    num_microbatches=num_microbatches,
                                    device=device,
                                    chunk=chunk,
85
                                    checkpoint=use_checkpoint,)
86
87
    if not forward_only:
        engine.initialize_optimizer(getattr(torch.optim, 'SGD'), lr=1e-3)
Ziyue Jiang's avatar
Ziyue Jiang committed
88
89

    for _ in range(epoch):
90
91
        input_x = torch.randn((batch_size, dim), device=device)
        input_y = torch.randn((batch_size, dim), device=device)
92
        logits = engine.forward_backward({'x': input_x, 'y': input_y}, labels=labels, forward_only=forward_only)
93
        
94
def run_worker(rank, model_cls, world_size, forward_only, master_func):
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
    master_addr = 'localhost'
    master_port = 29020
    os.environ['MASTER_ADDR'] = master_addr
    os.environ['MASTER_PORT'] = str(master_port)
    
    disable_existing_loggers()

    launch(dict(), rank, world_size, master_addr, master_port, 'nccl', verbose=False)
    ppg.set_global_info(rank=rank,
                        world_size=world_size,
                        dp_degree=1,
                        tp_degree=1,
                        num_worker_threads=128,
                        device='cuda')

    # in rpc mode, only rank 0 is needed to be coded
    if rank == 0:
112
        master_func(model_cls, world_size, forward_only)
113
114
115
116
117
118
    # barrier here
    if rpc_is_initialized():
        rpc.shutdown()
    
@pytest.mark.skip("skip due to CI torch version 1.11")
@parameterize('model_cls', [MLP, DAG_MLP])
119
@parameterize('forward_only', [True, False])
120
121
@pytest.mark.dist
@rerun_if_address_is_in_use()
122
def test_pp_middleware_fwd(model_cls, forward_only):
123
124
    world_size = 4
    master_func = run_master
125
    mp.spawn(run_worker, args=(model_cls, world_size, forward_only, master_func), nprocs=world_size)
Ziyue Jiang's avatar
Ziyue Jiang committed
126
127

if __name__ == "__main__":
128
    test_pp_middleware_fwd()