test_param_op.py 2.46 KB
Newer Older
1
2
import copy

3
import torch
4

5
6
from colossalai.gemini.paramhooks import BaseParamHookMgr
from tests.components_to_test.registry import non_distributed_component_funcs
7
8
9
10
11
12
13
14


def allclose(tensor_a: torch.Tensor, tensor_b: torch.Tensor, loose=False) -> bool:
    if loose:
        return torch.allclose(tensor_a, tensor_b, atol=1e-3, rtol=1e-3)
    return torch.allclose(tensor_a, tensor_b)


15
16
def run_model(model, inputs, label, criterion, use_param_hook=False):
    if use_param_hook:
17

18
        class HooKWrapper:
19

20
21
            def __init__(self) -> None:
                self.hook_triggered_times = 0
22

23
            def wrapper_func(self):
24

25
26
27
                def hook(param, grad) -> torch.Tensor or None:
                    self.hook_triggered_times += 1
                    return grad
28

29
                return hook
30

31
32
33
34
        hookwrapper = HooKWrapper()
        param_list = [p for p in model.parameters()]
        hook_mgr = BaseParamHookMgr(param_list)
        hook_mgr.register_backward_hooks(hookwrapper.wrapper_func())
35

36
    model.zero_grad(set_to_none=True)
37

38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
    with torch.cuda.amp.autocast():
        if criterion:
            y = model(inputs)
            loss = criterion(y, label)
        else:
            loss = model(inputs, label)
        loss = loss.float()
    loss.backward()

    if use_param_hook:
        hook_mgr.remove_hooks()
        return hookwrapper.hook_triggered_times


def test_base_param_hook():
    test_models = ['repeated_computed_layers', 'resnet18', 'no_leaf_module', 'inline_op_module']
    # test_models = ['bert']
55

56
57
58
    for model_name in test_models:
        get_components_func = non_distributed_component_funcs.get_callable(model_name)
        model_builder, train_dataloader, _, _, criterion = get_components_func()
59

60
61
62
        torch.manual_seed(0)
        model = model_builder(checkpoint=True).cuda()
        model.train()
63

64
65
66
67
        for i, (inputs, label) in enumerate(train_dataloader):
            if i > 0:
                break
            model_copy = copy.deepcopy(model)
68

69
70
            run_model(model, inputs.cuda(), label.cuda(), criterion, False)
            ret2 = run_model(model_copy, inputs.cuda(), label.cuda(), criterion, True)
71

72
73
        # Make sure param hook has only be fired once in case of parameter sharing
        assert ret2 == len(list(model.parameters()))
74

75
76
        for p, p_copy in zip(model.parameters(), model_copy.parameters()):
            assert allclose(p.grad, p_copy.grad), f"{p.grad} vs {p_copy.grad}"
77
78
79
80


if __name__ == '__main__':
    test_base_param_hook()