test_param_op.py 2.55 KB
Newer Older
1
2
import copy

3
import torch
4

5
from colossalai.testing import clear_cache_before_run
6
from colossalai.zero.legacy.gemini.paramhooks import BaseParamHookMgr
7
from tests.components_to_test.registry import non_distributed_component_funcs
8
9
10
11
12
13
14
15


def allclose(tensor_a: torch.Tensor, tensor_b: torch.Tensor, loose=False) -> bool:
    if loose:
        return torch.allclose(tensor_a, tensor_b, atol=1e-3, rtol=1e-3)
    return torch.allclose(tensor_a, tensor_b)


16
17
def run_model(model, inputs, label, criterion, use_param_hook=False):
    if use_param_hook:
18

19
        class HooKWrapper:
20

21
22
            def __init__(self) -> None:
                self.hook_triggered_times = 0
23

24
            def wrapper_func(self):
25

26
27
28
                def hook(param, grad) -> torch.Tensor or None:
                    self.hook_triggered_times += 1
                    return grad
29

30
                return hook
31

32
33
34
35
        hookwrapper = HooKWrapper()
        param_list = [p for p in model.parameters()]
        hook_mgr = BaseParamHookMgr(param_list)
        hook_mgr.register_backward_hooks(hookwrapper.wrapper_func())
36

37
    model.zero_grad(set_to_none=True)
38

39
40
41
42
43
44
45
46
47
48
49
50
51
52
    with torch.cuda.amp.autocast():
        if criterion:
            y = model(inputs)
            loss = criterion(y, label)
        else:
            loss = model(inputs, label)
        loss = loss.float()
    loss.backward()

    if use_param_hook:
        hook_mgr.remove_hooks()
        return hookwrapper.hook_triggered_times


53
@clear_cache_before_run()
54
def test_base_param_hook():
55
    test_models = ['repeated_computed_layers', 'resnet18', 'hanging_param_model', 'inline_op_model']
56
    # test_models = ['bert']
57

58
59
60
    for model_name in test_models:
        get_components_func = non_distributed_component_funcs.get_callable(model_name)
        model_builder, train_dataloader, _, _, criterion = get_components_func()
61

62
63
64
        torch.manual_seed(0)
        model = model_builder(checkpoint=True).cuda()
        model.train()
65

66
67
68
69
        for i, (inputs, label) in enumerate(train_dataloader):
            if i > 0:
                break
            model_copy = copy.deepcopy(model)
70

71
72
            run_model(model, inputs.cuda(), label.cuda(), criterion, False)
            ret2 = run_model(model_copy, inputs.cuda(), label.cuda(), criterion, True)
73

74
75
        # Make sure param hook has only be fired once in case of parameter sharing
        assert ret2 == len(list(model.parameters()))
76

77
78
        for p, p_copy in zip(model.parameters(), model_copy.parameters()):
            assert allclose(p.grad, p_copy.grad), f"{p.grad} vs {p_copy.grad}"
79
80
81
82


if __name__ == '__main__':
    test_base_param_hook()