test_shard_param.py 3.75 KB
Newer Older
1
from copy import deepcopy
Jiarui Fang's avatar
Jiarui Fang committed
2
3
from functools import partial

4
import colossalai
Jiarui Fang's avatar
Jiarui Fang committed
5
6
7
import pytest
import torch
import torch.multiprocessing as mp
8
from colossalai.testing import parameterize
Jiarui Fang's avatar
Jiarui Fang committed
9
from colossalai.utils import free_port
ver217's avatar
ver217 committed
10
from colossalai.zero.shard_utils import (BucketTensorShardStrategy, TensorShardStrategy)
11
from colossalai.zero.sharded_param import ShardedTensor
12
from colossalai.zero.sharded_param.sharded_param import ShardedParamV2
13
from colossalai.testing import rerun_on_exception
ver217's avatar
ver217 committed
14
from tests.test_zero_data_parallel.common import CONFIG, allclose
Jiarui Fang's avatar
Jiarui Fang committed
15

Jiarui Fang's avatar
Jiarui Fang committed
16

17
18
@parameterize("shard_strategy_class", [TensorShardStrategy, BucketTensorShardStrategy])
def run_shard_tensor_with_strategy(shard_strategy_class, world_size):
Jiarui Fang's avatar
Jiarui Fang committed
19
    t = ShardedTensor(tensor=torch.randn(world_size * 2, 3))
20
    assert list(t.origin_shape) == [world_size * 2, 3]
Jiarui Fang's avatar
Jiarui Fang committed
21
    assert list(t.shape) == [world_size * 2, 3]
22

23
    shard_strategy = shard_strategy_class()
Jiarui Fang's avatar
Jiarui Fang committed
24

25
26
    # test shard strategy
    shard_strategy.shard([t])
27
    assert list(t.shape) == [6], f"{list(t.shape)} vs 6"
28
    shard_strategy.gather([t])
29
    assert list(t.shape) == [world_size * 2, 3], f"{list(t.shape)} vs {[world_size * 2, 3]}"
Jiarui Fang's avatar
Jiarui Fang committed
30
31


32
33
34
35
36
def _run_shard_tensor(rank, world_size, port):
    colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')
    run_shard_tensor_with_strategy(world_size=world_size)


Jiarui Fang's avatar
Jiarui Fang committed
37
@pytest.mark.dist
jiaruifang's avatar
jiaruifang committed
38
@pytest.mark.parametrize("world_size", [1, 2])
39
@rerun_on_exception(exception_type=mp.ProcessRaisedException, pattern=".*Address already in use.*")
40
41
def test_shard_tensor(world_size):
    run_func = partial(_run_shard_tensor, world_size=world_size, port=free_port())
Jiarui Fang's avatar
Jiarui Fang committed
42
43
44
    mp.spawn(run_func, nprocs=world_size)


45
def _run_shard_param_v2(rank, world_size, port):
Jiarui Fang's avatar
Jiarui Fang committed
46
47
    colossalai.launch(config=CONFIG, rank=rank, world_size=world_size, host='localhost', port=port, backend='nccl')

48
49
50
51
    param = torch.nn.Parameter(torch.randn(2, 3))
    param_ref = deepcopy(param)
    sparam = ShardedParamV2(param=param, process_group=None)

52
    allclose(sparam.sharded_data_tensor.payload, param_ref.data)
53

54
55
56
    # Test get memory usage
    sparam.fp32_grad = torch.randn(2, 3)
    cuda_mem_use, cpu_mem_use = sparam.get_memory_usage()
57
58
59
60
61
62
63
    assert cpu_mem_use == 2 * 3 * 4 * 2, f"cpu_mem_use: {cpu_mem_use}"

    sparam.remove_torch_payload()
    assert (param.data.numel() == 1)
    cuda_mem_use, cpu_mem_use = sparam.get_memory_usage()
    # 4 is size of dummy tensor of param.data
    assert cpu_mem_use == 2 * 3 * 4 * 2 + 4
64
65
66

    sparam.fp16_grad = torch.randn(2, 3).cuda().half()
    cuda_mem_use, cpu_mem_use = sparam.get_memory_usage()
67
    assert cpu_mem_use == 2 * 3 * 4 * 2 + 4
68
69
70
71
72
73
    assert cuda_mem_use == 2 * 3 * 2

    sparam.fp16_grad = None
    sparam.fp32_grad = torch.randn(2, 3)
    sparam.remove_torch_payload()
    cuda_mem_use, cpu_mem_use = sparam.get_memory_usage()
74
75
76
77
78
79
80
81
82
83
84
85
86
    assert cpu_mem_use == 2 * 3 * 4 * 2 + 4
    assert cuda_mem_use == 0

    # append a grad to torch param
    param.data = sparam.sharded_data_tensor.payload
    param.grad = torch.randn(2, 3)
    cuda_mem_use, cpu_mem_use = sparam.get_memory_usage()
    assert cpu_mem_use == 2 * 3 * 4 * 2 + 2 * 3 * 4, f"cpu_mem_use {cpu_mem_use}"
    assert cuda_mem_use == 0

    # reuse torch grad for sparam
    sparam.fp32_grad = param.grad
    cuda_mem_use, cpu_mem_use = sparam.get_memory_usage()
87
88
89
    assert cpu_mem_use == 2 * 3 * 4 * 2
    assert cuda_mem_use == 0

Jiarui Fang's avatar
Jiarui Fang committed
90

91
@pytest.mark.dist
jiaruifang's avatar
jiaruifang committed
92
@pytest.mark.parametrize("world_size", [1, 2])
93
@rerun_on_exception(exception_type=mp.ProcessRaisedException, pattern=".*Address already in use.*")
jiaruifang's avatar
jiaruifang committed
94
def test_shard_param_v2(world_size):
95
96
    run_func = partial(_run_shard_param_v2, world_size=world_size, port=free_port())
    mp.spawn(run_func, nprocs=world_size)
Jiarui Fang's avatar
Jiarui Fang committed
97

98

Jiarui Fang's avatar
Jiarui Fang committed
99
if __name__ == '__main__':
100
    # test_shard_tensor(2)
jiaruifang's avatar
jiaruifang committed
101
    test_shard_param_v2(2)