test_nccl.py 3.19 KB
Newer Older
1
import unittest
2

3
import backend as F
4
5
import torch
import torch.distributed as dist
6

7
8
9
from dgl.cuda import nccl
from dgl.partition import NDArrayPartition

10

11
12
13
@unittest.skipIf(
    F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
14
def test_nccl_sparse_push_single_remainder():
15
16
17
18
19
20
21
    torch.cuda.set_device("cuda:0")
    dist.init_process_group(
        backend="nccl",
        init_method="tcp://127.0.0.1:12345",
        world_size=1,
        rank=0,
    )
22
23
24
25

    index = F.randint([10000], F.int32, F.ctx(), 0, 10000)
    value = F.uniform([10000, 100], F.float32, F.ctx(), -1.0, 1.0)

26
    part = NDArrayPartition(10000, 1, "remainder")
27

28
    ri, rv = nccl.sparse_all_to_all_push(index, value, part)
29
30
31
    assert F.array_equal(ri, index)
    assert F.array_equal(rv, value)

32
33
    dist.destroy_process_group()

34
35
36
37

@unittest.skipIf(
    F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
38
def test_nccl_sparse_pull_single_remainder():
39
40
41
42
43
44
45
    torch.cuda.set_device("cuda:0")
    dist.init_process_group(
        backend="nccl",
        init_method="tcp://127.0.0.1:12345",
        world_size=1,
        rank=0,
    )
46
47
48
49

    req_index = F.randint([10000], F.int64, F.ctx(), 0, 100000)
    value = F.uniform([100000, 100], F.float32, F.ctx(), -1.0, 1.0)

50
    part = NDArrayPartition(100000, 1, "remainder")
51

52
    rv = nccl.sparse_all_to_all_pull(req_index, value, part)
53
54
55
    exp_rv = F.gather_row(value, req_index)
    assert F.array_equal(rv, exp_rv)

56
57
    dist.destroy_process_group()

58
59
60
61

@unittest.skipIf(
    F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
62
def test_nccl_sparse_push_single_range():
63
64
65
66
67
68
69
    torch.cuda.set_device("cuda:0")
    dist.init_process_group(
        backend="nccl",
        init_method="tcp://127.0.0.1:12345",
        world_size=1,
        rank=0,
    )
70
71
72
73

    index = F.randint([10000], F.int32, F.ctx(), 0, 10000)
    value = F.uniform([10000, 100], F.float32, F.ctx(), -1.0, 1.0)

74
75
76
77
    part_ranges = F.copy_to(
        F.tensor([0, value.shape[0]], dtype=F.int64), F.ctx()
    )
    part = NDArrayPartition(10000, 1, "range", part_ranges=part_ranges)
78

79
    ri, rv = nccl.sparse_all_to_all_push(index, value, part)
80
81
82
    assert F.array_equal(ri, index)
    assert F.array_equal(rv, value)

83
84
    dist.destroy_process_group()

85
86
87
88

@unittest.skipIf(
    F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
89
def test_nccl_sparse_pull_single_range():
90
91
92
93
94
95
96
    torch.cuda.set_device("cuda:0")
    dist.init_process_group(
        backend="nccl",
        init_method="tcp://127.0.0.1:12345",
        world_size=1,
        rank=0,
    )
97
98
99
100

    req_index = F.randint([10000], F.int64, F.ctx(), 0, 100000)
    value = F.uniform([100000, 100], F.float32, F.ctx(), -1.0, 1.0)

101
102
103
104
    part_ranges = F.copy_to(
        F.tensor([0, value.shape[0]], dtype=F.int64), F.ctx()
    )
    part = NDArrayPartition(100000, 1, "range", part_ranges=part_ranges)
105

106
    rv = nccl.sparse_all_to_all_pull(req_index, value, part)
107
108
109
    exp_rv = F.gather_row(value, req_index)
    assert F.array_equal(rv, exp_rv)

110
    dist.destroy_process_group()
111

112
113

if __name__ == "__main__":
114
115
116
117
    test_nccl_sparse_push_single_remainder()
    test_nccl_sparse_pull_single_remainder()
    test_nccl_sparse_push_single_range()
    test_nccl_sparse_pull_single_range()