"vscode:/vscode.git/clone" did not exist on "b55deb0662005e5db37075163a38487ff006eb68"
test_shard_param.py 4.86 KB
Newer Older
Jiarui Fang's avatar
Jiarui Fang committed
1
2
3
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

4
from copy import deepcopy
Jiarui Fang's avatar
Jiarui Fang committed
5
6
from functools import partial

7
import colossalai
Jiarui Fang's avatar
Jiarui Fang committed
8
9
10
import pytest
import torch
import torch.multiprocessing as mp
11
from colossalai.logging import disable_existing_loggers, get_dist_logger
Jiarui Fang's avatar
Jiarui Fang committed
12
from colossalai.utils import free_port
ver217's avatar
ver217 committed
13
from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy)
14
15
from colossalai.zero.sharded_param import ShardedParam, ShardedTensor
from colossalai.zero.sharded_param.sharded_param import ShardedParamV2
16
from tests.components_to_test.registry import non_distributed_component_funcs
ver217's avatar
ver217 committed
17
from tests.test_zero_data_parallel.common import CONFIG, allclose
Jiarui Fang's avatar
Jiarui Fang committed
18

Jiarui Fang's avatar
Jiarui Fang committed
19

ver217's avatar
ver217 committed
20
def _run_shard_tensor(rank, world_size, port, shard_strategy):
Jiarui Fang's avatar
Jiarui Fang committed
21
22
    colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
    t = ShardedTensor(tensor=torch.randn(world_size * 2, 3))
23
    assert list(t.origin_shape) == [world_size * 2, 3]
Jiarui Fang's avatar
Jiarui Fang committed
24
    assert list(t.shape) == [world_size * 2, 3]
25

ver217's avatar
ver217 committed
26
    shard_strategy = shard_strategy(process_group=None)
Jiarui Fang's avatar
Jiarui Fang committed
27

28
29
    # test shard strategy
    shard_strategy.shard([t])
30
    assert list(t.shape) == [6], f"{list(t.shape)} vs 6"
31
    shard_strategy.gather([t])
32
    assert list(t.shape) == [world_size * 2, 3], f"{list(t.shape)} vs {[world_size * 2, 3]}"
Jiarui Fang's avatar
Jiarui Fang committed
33
34
35


@pytest.mark.dist
jiaruifang's avatar
jiaruifang committed
36
@pytest.mark.parametrize("world_size", [1, 2])
ver217's avatar
ver217 committed
37
38
39
@pytest.mark.parametrize("shard_strategy", [TensorShardStrategy, BucketTensorShardStrategy])
def test_shard_tensor(world_size, shard_strategy):
    run_func = partial(_run_shard_tensor, world_size=world_size, port=free_port(), shard_strategy=shard_strategy)
Jiarui Fang's avatar
Jiarui Fang committed
40
41
42
    mp.spawn(run_func, nprocs=world_size)


43
def _run_shard_param_v2(rank, world_size, port):
Jiarui Fang's avatar
Jiarui Fang committed
44
45
    colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')

46
47
48
49
    param = torch.nn.Parameter(torch.randn(2, 3))
    param_ref = deepcopy(param)
    sparam = ShardedParamV2(param=param, process_group=None)

50
    allclose(sparam.data.payload, param_ref.data)
51
52

    sparam.remove_torch_payload()
53
    assert (param.data.numel() == 1)
Jiarui Fang's avatar
Jiarui Fang committed
54
55


56
@pytest.mark.dist
jiaruifang's avatar
jiaruifang committed
57
58
@pytest.mark.parametrize("world_size", [1, 2])
def test_shard_param_v2(world_size):
59
60
    run_func = partial(_run_shard_param_v2, world_size=world_size, port=free_port())
    mp.spawn(run_func, nprocs=world_size)
Jiarui Fang's avatar
Jiarui Fang committed
61

62
63

def _run_test_shard_param(rank, world_size, port):
Jiarui Fang's avatar
Jiarui Fang committed
64
65
    colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')

66
67
68
69
70
71
    param = torch.nn.Parameter(torch.randn(2, 3))
    param_ref = deepcopy(param)
    sparam = ShardedParamV2(param=param, process_group=None)
    print(sparam.data)
    print(param_ref.data)

Jiarui Fang's avatar
Jiarui Fang committed
72
    logger = get_dist_logger()
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
    for get_components_func in non_distributed_component_funcs:
        model_builder, *_ = get_components_func()
        model = model_builder(checkpoint=True)
        # add an attribute as col_attr to hijack the access to param.data
        for _, param in model.named_parameters():
            numel_ref = (param.numel() + world_size - 1) // world_size
            param.col_attr = ShardedParam(param)
            param.col_attr.shard()
            param_data = param.col_attr.payload(torch.device('cpu'))
            assert (numel_ref == param_data.numel())

        for _, param in model.named_parameters():
            param.col_attr.gather()
            param_data = param.col_attr.payload(torch.device('cpu'))

        disable_existing_loggers([logger])
Jiarui Fang's avatar
Jiarui Fang committed
89

Jiarui Fang's avatar
Jiarui Fang committed
90

Jiarui Fang's avatar
Jiarui Fang committed
91
@pytest.mark.dist
jiaruifang's avatar
jiaruifang committed
92
93
@pytest.mark.parametrize("world_size", [1, 2])
def test_shard_param(world_size):
94
    run_func = partial(_run_test_shard_param, world_size=world_size, port=free_port())
Jiarui Fang's avatar
Jiarui Fang committed
95
96
    mp.spawn(run_func, nprocs=world_size)

Jiarui Fang's avatar
Jiarui Fang committed
97

jiaruifang's avatar
jiaruifang committed
98
def _run_init_shard_param(rank, world_size, port):
99
    colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
jiaruifang's avatar
jiaruifang committed
100
    param = torch.nn.Parameter(data=torch.rand(world_size, 3))
101
102
103
104
105
    sparam = ShardedParam(param, None, True)
    payload = sparam.payload(torch.device('cuda'))
    assert (list(payload.shape) == [3])
    del sparam

jiaruifang's avatar
jiaruifang committed
106
    param_shape = (world_size, 3)
107
108
109
110
    sparam = ShardedParam(param_shape, process_group=None, is_sharded=True, device=torch.device('cpu'))
    payload = sparam.payload(torch.device('cuda'))
    assert (list(payload.shape) == [3])

jiaruifang's avatar
jiaruifang committed
111
    param_shape = (world_size, 3)
112
113
    sparam = ShardedParam(param_shape, process_group=None, is_sharded=False, device=torch.device('cpu'))
    payload = sparam.payload(torch.device('cuda'))
jiaruifang's avatar
jiaruifang committed
114
    assert (list(payload.shape) == [world_size, 3])
115
116


Jiarui Fang's avatar
Jiarui Fang committed
117
@pytest.mark.dist
jiaruifang's avatar
jiaruifang committed
118
119
120
@pytest.mark.parametrize("world_size", [1, 4])
def test_init_shard_param(world_size):
    run_func = partial(_run_init_shard_param, world_size=world_size, port=free_port())
Jiarui Fang's avatar
Jiarui Fang committed
121
122
123
    mp.spawn(run_func, nprocs=world_size)


Jiarui Fang's avatar
Jiarui Fang committed
124
if __name__ == '__main__':
ver217's avatar
ver217 committed
125
    test_shard_tensor(2, TensorShardStrategy)
jiaruifang's avatar
jiaruifang committed
126
127
128
    test_shard_param(2)
    test_shard_param_v2(2)
    test_init_shard_param(4)