test_nccl.py 3.23 KB
Newer Older
1
import unittest
2

3
4
import backend as F

5
6
7
from dgl.cuda import nccl
from dgl.partition import NDArrayPartition

8
9

def gen_test_id():
10
    return "{:0256x}".format(78236728318467363)
11

12
13
14
15

@unittest.skipIf(
    F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def test_nccl_id():
    nccl_id = nccl.UniqueId()
    text = str(nccl_id)
    nccl_id2 = nccl.UniqueId(id_str=text)

    assert nccl_id == nccl_id2

    nccl_id2 = nccl.UniqueId(gen_test_id())

    assert nccl_id2 != nccl_id

    nccl_id3 = nccl.UniqueId(str(nccl_id2))

    assert nccl_id2 == nccl_id3


32
33
34
@unittest.skipIf(
    F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
35
def test_nccl_sparse_push_single_remainder():
36
37
38
39
40
41
    nccl_id = nccl.UniqueId()
    comm = nccl.Communicator(1, 0, nccl_id)

    index = F.randint([10000], F.int32, F.ctx(), 0, 10000)
    value = F.uniform([10000, 100], F.float32, F.ctx(), -1.0, 1.0)

42
    part = NDArrayPartition(10000, 1, "remainder")
43
44
45
46
47

    ri, rv = comm.sparse_all_to_all_push(index, value, part)
    assert F.array_equal(ri, index)
    assert F.array_equal(rv, value)

48
49
50
51

@unittest.skipIf(
    F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
52
def test_nccl_sparse_pull_single_remainder():
53
54
55
56
57
58
    nccl_id = nccl.UniqueId()
    comm = nccl.Communicator(1, 0, nccl_id)

    req_index = F.randint([10000], F.int64, F.ctx(), 0, 100000)
    value = F.uniform([100000, 100], F.float32, F.ctx(), -1.0, 1.0)

59
    part = NDArrayPartition(100000, 1, "remainder")
60
61
62
63
64

    rv = comm.sparse_all_to_all_pull(req_index, value, part)
    exp_rv = F.gather_row(value, req_index)
    assert F.array_equal(rv, exp_rv)

65
66
67
68

@unittest.skipIf(
    F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
69
70
71
72
73
74
75
def test_nccl_sparse_push_single_range():
    nccl_id = nccl.UniqueId()
    comm = nccl.Communicator(1, 0, nccl_id)

    index = F.randint([10000], F.int32, F.ctx(), 0, 10000)
    value = F.uniform([10000, 100], F.float32, F.ctx(), -1.0, 1.0)

76
77
78
79
    part_ranges = F.copy_to(
        F.tensor([0, value.shape[0]], dtype=F.int64), F.ctx()
    )
    part = NDArrayPartition(10000, 1, "range", part_ranges=part_ranges)
80
81
82
83
84

    ri, rv = comm.sparse_all_to_all_push(index, value, part)
    assert F.array_equal(ri, index)
    assert F.array_equal(rv, value)

85
86
87
88

@unittest.skipIf(
    F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
89
90
91
92
93
94
95
def test_nccl_sparse_pull_single_range():
    nccl_id = nccl.UniqueId()
    comm = nccl.Communicator(1, 0, nccl_id)

    req_index = F.randint([10000], F.int64, F.ctx(), 0, 100000)
    value = F.uniform([100000, 100], F.float32, F.ctx(), -1.0, 1.0)

96
97
98
99
    part_ranges = F.copy_to(
        F.tensor([0, value.shape[0]], dtype=F.int64), F.ctx()
    )
    part = NDArrayPartition(100000, 1, "range", part_ranges=part_ranges)
100
101
102
103
104

    rv = comm.sparse_all_to_all_pull(req_index, value, part)
    exp_rv = F.gather_row(value, req_index)
    assert F.array_equal(rv, exp_rv)

105
106
107
108

@unittest.skipIf(
    F._default_context_str == "cpu", reason="NCCL only runs on GPU."
)
109
110
111
112
def test_nccl_support():
    # this is just a smoke test, as we don't have any other way to know
    # if NCCL support is compiled in right now.
    nccl.is_supported()
113

114
115

if __name__ == "__main__":
116
117
118
    test_nccl_id()
    test_nccl_sparse_push_single()
    test_nccl_sparse_pull_single()