test_ddp_adascale.py 7.95 KB
Newer Older
1
2
3
4
5
6
7
8
9
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.

# pylint: disable=missing-module-docstring
# pylint: disable=missing-class-docstring
# pylint: disable=missing-function-docstring

Min Xu's avatar
Min Xu committed
10
11
12
13
14
15
16
17
18
19
20
21
22
""" Test AdaScale with DDP/SDP/FSDP.

    Even though it is tested here, AdaScale does NOT work with SDP/FSDP the
    same way as DDP & gradient accumulation modes, because the full
    gradients are not sent to each worker.

    So they only have a slice of the reduced gradient in FSDP's case or
    only a subset of gradients are reduced in SDP's. OTOH, each AdaScale
    work receives full local-gradient.  So the gain value computation is
    off. If they use a slice (or subset) of their local-gradient, the gain
    values they each compute will be different, which might or might not
    be helpful for training.
"""
23
24
25
26
27
28
29
30
31
32
33
34
35

import tempfile

import numpy as np
import pytest
import torch
from torch import Tensor
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn import Linear
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.optim import SGD

36
37
from fairscale.fair_dev.testing.golden_testing_data import adascale_test_data
from fairscale.fair_dev.testing.testing import skip_if_single_gpu
38
from fairscale.internal import torch_version
Min Xu's avatar
Min Xu committed
39
40
41
from fairscale.nn.data_parallel import FullyShardedDataParallel as FSDP
from fairscale.nn.data_parallel import ShardedDataParallel as SDP
from fairscale.optim import OSS, AdaScale
42
43
44
45
46
47
48
49


def _dist_init(rank, world_size, tempfile_name, backend):
    url = "file://" + tempfile_name
    dist.init_process_group(init_method=url, backend=backend, rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)


Min Xu's avatar
Min Xu committed
50
def _test_basic_func(rank, ddp_cls, world_size, tempfile_name, test_case):
51
52
    _dist_init(rank, world_size, tempfile_name, backend="nccl")  # Covers nccl

53
    model = Linear(2, 2)
54
    model.to("cuda")
Min Xu's avatar
Min Xu committed
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
    if ddp_cls is DDP:
        model = ddp_cls(model, device_ids=[rank])
        optim = AdaScale(SGD(model.parameters(), lr=0.1))
    elif ddp_cls is SDP:
        optim = AdaScale(OSS(model.parameters(), SGD, lr=0.1))
        model = ddp_cls(model, sharded_optimizer=optim)
    else:
        assert ddp_cls is FSDP, ddp_cls
        # Two cases:
        #    flatten=True : AdaScale wrapper must be after FSDP and it receives
        #                   a single grad tensor. It won't receive grad if
        #                   wrapped before.
        #    flatten=False: AdaScale can be both before or after FSDP.
        # So, it is better to do AdaScale after FSDP.
        model = ddp_cls(model, flatten_parameters=False)
        optim = AdaScale(SGD(model.parameters(), lr=0.1))
71
72
73
74
75
76
    if "input" in test_case:
        # single iter
        in_data = Tensor(test_case["input"][rank])
        in_data = in_data.cuda()
        out = model(in_data)
        out.sum().backward()
Min Xu's avatar
Min Xu committed
77
78
        if ddp_cls is DDP:
            assert np.allclose(optim.gain(), test_case["expected_gain"]), optim.gain()
79
80
81
            w, b = model.parameters()
            assert np.allclose(w.grad.cpu(), test_case["expected_grad"]), w.grad
            assert np.allclose(b.grad.cpu(), test_case["expected_bias_grad"]), b.grad
82
83
84
85
        optim.step()
        optim.zero_grad()
    else:
        # multiple iters
86
87
        n = len(test_case["inputs"])
        for i, in_data in enumerate(test_case["inputs"]):
88
89
90
            in_data = Tensor(in_data[rank]).cuda()
            out = model(in_data)
            out.sum().backward()
91
92
93
94
95
            if i == n - 1 and ddp_cls is DDP:
                assert np.allclose(optim.gain(), test_case["expected_gain"]), optim.gain()
                w, b = model.parameters()
                assert np.allclose(w.grad.cpu(), test_case["expected_grad"]), w.grad
                assert np.allclose(b.grad.cpu(), test_case["expected_bias_grad"]), b.grad
96
97
            optim.step()
            optim.zero_grad()
98
99
100
101
102

    dist.destroy_process_group()


@skip_if_single_gpu
Min Xu's avatar
Min Xu committed
103
@pytest.mark.parametrize("ddp_cls", [DDP])
Min Xu's avatar
Min Xu committed
104
@pytest.mark.parametrize("test_case", adascale_test_data)
Min Xu's avatar
Min Xu committed
105
def test_basic(ddp_cls, test_case):
106
107
108
109
    """Test adascale with DDP without gradient accumulation"""
    world_size = 2
    temp_file_name = tempfile.mkstemp()[1]

Min Xu's avatar
Min Xu committed
110
111
112
113
114
115
116
117
118
    mp.spawn(_test_basic_func, args=(ddp_cls, world_size, temp_file_name, test_case), nprocs=world_size, join=True)


@skip_if_single_gpu
@pytest.mark.parametrize("ddp_cls", [DDP, SDP, FSDP])
@pytest.mark.parametrize("test_case", adascale_test_data[:1])
def test_basic_all_dp(ddp_cls, test_case):
    """Test adascale with DDP/SDP/FSDP with just one test case."""
    test_basic(ddp_cls, test_case)
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155


def _test_grad_accum_func(rank, world_size, tempfile_name):
    _dist_init(rank, world_size, tempfile_name, backend="gloo")  # Covers gloo

    model = Linear(4, 2, bias=False)
    model.to("cuda")
    model = DDP(model, device_ids=[rank])
    optim = AdaScale(SGD(model.parameters(), lr=0.1), num_gradients_to_accumulate=2)
    with model.no_sync():
        # iter 1, input vectors are pointing dim0 and dim1
        in_data = Tensor([0.0] * 4)
        in_data[rank] = 1.0
        in_data = in_data.cuda()
        out = model(in_data)
        out.sum().backward()
    # iter 2, input vectors are pointing dim2 and dim3
    in_data = Tensor([0.0] * 4)
    in_data[rank + 2] = 1.0
    in_data = in_data.cuda()
    out = model(in_data)
    out.sum().backward()
    # since all inputs are orthogonal, the gain should be exactly 4.0.
    assert np.allclose(optim.gain(), 4.0), optim.gain()
    optim.step()
    optim.zero_grad()

    dist.destroy_process_group()


@skip_if_single_gpu
def test_grad_accum():
    """Test adascale with DDP + gradient accumulation using ddp.no_sync()"""
    world_size = 2
    temp_file_name = tempfile.mkstemp()[1]

    mp.spawn(_test_grad_accum_func, args=(world_size, temp_file_name), nprocs=world_size, join=True)
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211


def _test_corr_mean_func(rank, world_size, tempfile_name, test_case):
    _dist_init(rank, world_size, tempfile_name, backend="gloo")  # Covers gloo

    model = Linear(3, 1, bias=False)
    model.to("cuda")
    model = DDP(model, device_ids=[rank])
    optim = AdaScale(SGD(model.parameters(), lr=0.1))
    results = []
    last_grad = None
    for i, in_data in enumerate(test_case["inputs"]):
        # use no_sync so we can access nonreduced gradients
        with model.no_sync():
            in_data = Tensor(in_data[rank]).cuda()
            out = model(in_data)
            out.sum().backward()
            results.append(optim._compute_intra_grad_corr_mean().item())
        # sync gradients manually
        for p in model.parameters():
            if p.grad is not None:
                dist.all_reduce(p.grad, op=dist.ReduceOp.SUM)
                # divide by world size
                p.grad.data.div_(world_size)
        grad = optim._gather_flat_grad()
        assert np.allclose(grad.cpu(), test_case["expected_grad"][i])
        optim.step()
        if last_grad is not None:
            # compute cosine similarity
            cos_similarity = torch.dot(grad, last_grad) / (grad.norm() * last_grad.norm())
            np.allclose(cos_similarity.cpu(), test_case["expected_cos_similarity"][i])
        last_grad = grad
        optim.zero_grad()
    assert np.allclose(results, test_case["expected_corr"]), results

    dist.destroy_process_group()


@skip_if_single_gpu
@pytest.mark.skipif(
    torch_version() < (1, 10, 0),
    reason="torch.corrcoef available only for torch 1.10 or higher",
)
def test_corr_mean():
    """
    Test _compute_intra_grad_corr_mean and _gather_flat_grad using ddp.no_sync()
    We also demonstrate how cosine similarity between consecutive gradients can be computed using _gather_flat_grad
    """
    world_size = 2
    temp_file_name = tempfile.mkstemp()[1]

    from fairscale.fair_dev.testing.golden_testing_data import corr_mean_test_data

    test_case = corr_mean_test_data[0]

    mp.spawn(_test_corr_mean_func, args=(world_size, temp_file_name, test_case), nprocs=world_size, join=True)