test_search.py 2.1 KB
Newer Older
1
2
3
4
import pytest
import torch

import colossalai
5
6
from colossalai.testing import rerun_if_address_is_in_use, spawn
from colossalai.utils import get_current_device
7
from colossalai.zero.gemini.chunk import init_chunk_manager, search_chunk_configuration
8
9
10
11
12
13
from tests.components_to_test.registry import non_distributed_component_funcs


def exam_search_chunk_size():
    world_size = torch.distributed.get_world_size()

14
    get_components_func = non_distributed_component_funcs.get_callable("gpt2")
15
16
17
    model_builder, train_dataloader, test_dataloader, optimizer_class, criterion = get_components_func()

    # make sure torch_model and model has the same parameter values
18
    model = model_builder()
19
20
21
    config_dict, *_ = search_chunk_configuration(
        model, search_range_m=1, search_interval=16, min_chunk_size_m=0, filter_exlarge_params=True
    )
22
23

    for key in config_dict:
24
        chunk_size = config_dict[key]["chunk_size"]
25
        if world_size == 1 or True:
26
27
28
29
30
            assert chunk_size == 31616
        else:
            assert chunk_size == 1024


31
32
33
def exam_chunk_manager():
    world_size = torch.distributed.get_world_size()

34
    get_components_func = non_distributed_component_funcs.get_callable("gpt2")
35
36
    model_builder, train_dataloader, test_dataloader, optimizer_class, criterion = get_components_func()

37
    sharded_ddp_model = model_builder()
38
39
40
41
42
43
44
45
46
    chunk_manager = init_chunk_manager(
        sharded_ddp_model,
        get_current_device(),
        hidden_dim=16,
        search_range_m=1,
        min_chunk_size_m=0,
        filter_exlarge_params=True,
        strict_ddp_flag=True,
    )
47
48
49
50
51
    config_dict = chunk_manager.dp_degree_chunk_size_dict
    assert len(config_dict) == 1
    assert config_dict[world_size] == 31616


52
def run_dist(rank, world_size, port):
53
    colossalai.launch(config={}, rank=rank, world_size=world_size, host="localhost", port=port, backend="nccl")
54
    exam_search_chunk_size()
55
    exam_chunk_manager()
56
57
58


@pytest.mark.dist
59
@pytest.mark.parametrize("world_size", [1, 4])
60
61
@rerun_if_address_is_in_use()
def test_search(world_size):
62
    spawn(run_dist, world_size)
63
64


65
if __name__ == "__main__":
66
    test_search(4)