"git@developer.sourcefind.cn:zhaoyu6/sglang.git" did not exist on "c673727e0efff7f63674abe10685e9b7adef3174"
test_sharded_ddp.py 3.35 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.

"""
Testing OssDdp class.
"""

import tempfile

import pytest
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn import Linear, Sequential

18
from fairscale.nn.data_parallel import ShardedDataParallel
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

skip_if_no_cuda = pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda required")
skip_if_single_gpu = pytest.mark.skipif(torch.cuda.device_count() < 2, reason="multiple GPUs required")


def test_on_cpu():
    run_test(backend=dist.Backend.GLOO, device=torch.device("cpu"))


@skip_if_no_cuda
@skip_if_single_gpu
def test_on_gpu():
    run_test(backend=dist.Backend.NCCL, device=torch.device("cuda"))


def run_one_step(rank, world_size, backend, device, temp_file_name):
    url = "file://" + temp_file_name
    dist.init_process_group(init_method=url, backend=backend, rank=rank, world_size=world_size)
    if device == torch.device("cuda"):
        torch.cuda.set_device(rank)

40
    # Any model works. Add one different buffer per rank
41
    model = Sequential(Linear(2, 3), Linear(3, 4)).to(device)
42
43
    model.register_buffer("test_buffer", torch.ones((1)) * rank)
    model.to(device)
44
45

    ddp = ShardedDataParallel(
46
47
48
49
50
        module=model,
        optimizer=torch.optim.SGD,
        optimizer_params={"lr": 0.01, "momentum": 0.99},
        world_size=world_size,
        broadcast_buffers=True,
51
52
    )
    optimizer = ddp.optimizer
53
    model = ddp.module
54

55
    input_tensor = torch.rand((64, 2)).to(device)
56
    output = ddp(input_tensor).abs().sum() / input_tensor.numel()
57
58
    output.backward()
    ddp.reduce()
59
60
61
62
63
64
65
66
67
68

    # Check that all the grads have been populated, for the shard
    if device == torch.device("cuda"):
        torch.cuda.synchronize()  # flush any remaining cuda op, just in case

    for pg in optimizer.optim.param_groups:
        for param in pg["params"]:
            if param.requires_grad:
                assert param.grad.abs().sum().item() > 0.0, "The reduce step should have populated all the gradients"

69
70
71
    # Check that all the buffers are in sync (authoritative rank is 0, its buffer is 0)
    for b in model.buffers():
        assert b.cpu().item() == 0.0
72
73
74
75
76


def run_test(backend, device, world_size=2):
    temp_file_name = tempfile.mkstemp()[1]
    mp.spawn(run_one_step, args=(world_size, backend, device, temp_file_name), nprocs=world_size, join=True)
Min Xu's avatar
Min Xu committed
77
78
79
80
81
82
83
84


def run_eval_mode(_unused):
    """ Testing eval mode make sure this is no asserts. """
    dist.init_process_group(
        init_method=f"file://{tempfile.mkstemp()[1]}", backend=dist.Backend.GLOO, rank=0, world_size=1
    )
    model = Sequential(Linear(2, 3), Linear(3, 4))
85
    optimizer_params = {"lr": 0.1, "momentum": 0.99}
86
    ddp = ShardedDataParallel(model, torch.optim.SGD, optimizer_params, 1, broadcast_buffers=False)
87
    optimizer = ddp.optimizer
Min Xu's avatar
Min Xu committed
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

    ddp.eval()
    for _ in range(5):
        input_tensor = torch.rand((64, 2))
        output = ddp(input_tensor)

    ddp.train()
    try:
        for _ in range(5):
            input_tensor = torch.rand((64, 2))
            output = ddp(input_tensor)
    except RuntimeError:
        pass
    else:
        assert False, "Multiple forward passes on training mode should not pass"


def test_eval_mode():
    mp.spawn(run_eval_mode, args=(), join=True)