"examples/vscode:/vscode.git/clone" did not exist on "dc19cd5687165c9ae4ebb3e4c3199fd55d2155e6"
test_ffi-stream.py 6.42 KB
Newer Older
1
import unittest
2
3
4
5
from statistics import mean

import backend as F

6
7
import dgl
import dgl.ndarray as nd
8
import dgl.ops as OPS
9
10
import numpy as np
import torch
11
12
from dgl import rand_graph
from dgl._ffi.streams import _dgl_get_stream, to_dgl_stream_handle
13
from dgl.utils import to_dgl_context
14

15

16
17
# borrowed from PyTorch, torch/testing/_internal/common_utils.py
def _get_cycles_per_ms() -> float:
18
    """Measure and return approximate number of cycles per millisecond for torch.cuda._sleep"""
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

    def measure() -> float:
        start = torch.cuda.Event(enable_timing=True)
        end = torch.cuda.Event(enable_timing=True)
        start.record()
        torch.cuda._sleep(1000000)
        end.record()
        end.synchronize()
        cycles_per_ms = 1000000 / start.elapsed_time(end)
        return cycles_per_ms

    # Get 10 values and remove the 2 max and 2 min and return the avg.
    # This is to avoid system disturbance that skew the results, e.g.
    # the very first cuda call likely does a bunch of init, which takes
    # much longer than subsequent calls.
    num = 10
    vals = []
    for _ in range(num):
        vals.append(measure())
    vals = sorted(vals)
    return mean(vals[2 : num - 2])
40

41
42
43
44

@unittest.skipIf(
    F._default_context_str == "cpu", reason="stream only runs on GPU."
)
45
46
47
def test_basics():
    g = rand_graph(10, 20, device=F.cpu())
    x = torch.ones(g.num_nodes(), 10)
48
    result = OPS.copy_u_sum(g, x).to(F.ctx())
49

50
51
52
    # launch on default stream used in DGL
    xx = x.to(device=F.ctx())
    gg = g.to(device=F.ctx())
53
    OPS.copy_u_sum(gg, xx)
54
    assert torch.equal(OPS.copy_u_sum(gg, xx), result)
55
56
57
58
59
60

    # launch on new stream created via torch.cuda
    s = torch.cuda.Stream(device=F.ctx())
    with torch.cuda.stream(s):
        xx = x.to(device=F.ctx(), non_blocking=True)
        gg = g.to(device=F.ctx())
61
        OPS.copy_u_sum(gg, xx)
62
    s.synchronize()
63
    assert torch.equal(OPS.copy_u_sum(gg, xx), result)
64

65
66
67
68

@unittest.skipIf(
    F._default_context_str == "cpu", reason="stream only runs on GPU."
)
69
70
71
72
73
def test_set_get_stream():
    current_stream = torch.cuda.current_stream()
    # test setting another stream
    s = torch.cuda.Stream(device=F.ctx())
    torch.cuda.set_stream(s)
74
75
76
77
    assert (
        to_dgl_stream_handle(s).value
        == _dgl_get_stream(to_dgl_context(F.ctx())).value
    )
78
79
80
    # revert to default stream
    torch.cuda.set_stream(current_stream)

81
82
83
84

@unittest.skipIf(
    F._default_context_str == "cpu", reason="stream only runs on GPU."
)
85
86
87
88
# borrowed from PyTorch, test/test_cuda.py: test_record_stream()
def test_record_stream_ndarray():
    cycles_per_ms = _get_cycles_per_ms()

89
    t = nd.array(np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32), ctx=nd.cpu())
90
91
92
93
94
95
96
97
98
99
100
    t.pin_memory_()
    result = nd.empty([4], ctx=nd.gpu(0))
    stream = torch.cuda.Stream()
    ptr = [None]

    # Performs the CPU->GPU copy in a background stream
    def perform_copy():
        with torch.cuda.stream(stream):
            tmp = t.copyto(nd.gpu(0))
            ptr[0] = F.from_dgl_nd(tmp).data_ptr()
        torch.cuda.current_stream().wait_stream(stream)
101
        tmp.record_stream(to_dgl_stream_handle(torch.cuda.current_stream()))
102
103
104
105
106
107
        torch.cuda._sleep(int(50 * cycles_per_ms))  # delay the copy
        result.copyfrom(tmp)

    perform_copy()
    with torch.cuda.stream(stream):
        tmp2 = nd.empty([4], ctx=nd.gpu(0))
108
109
110
        assert (
            F.from_dgl_nd(tmp2).data_ptr() != ptr[0]
        ), "allocation re-used too soon"
111

112
113
114
    assert torch.equal(
        F.from_dgl_nd(result).cpu(), torch.tensor([1.0, 2.0, 3.0, 4.0])
    )
115
116
117
118
119

    # Check that the block will be re-used after the main stream finishes
    torch.cuda.current_stream().synchronize()
    with torch.cuda.stream(stream):
        tmp3 = nd.empty([4], ctx=nd.gpu(0))
120
121
122
        assert (
            F.from_dgl_nd(tmp3).data_ptr() == ptr[0]
        ), "allocation not re-used"
123

124
125
126
127

@unittest.skipIf(
    F._default_context_str == "cpu", reason="stream only runs on GPU."
)
128
129
130
131
def test_record_stream_graph_positive():
    cycles_per_ms = _get_cycles_per_ms()

    g = rand_graph(10, 20, device=F.cpu())
132
133
134
135
136
137
    g.create_formats_()
    x = torch.ones(g.num_nodes(), 10).to(F.ctx())
    g1 = g.to(F.ctx())
    # this is necessary to initialize the cusparse handle
    result = OPS.copy_u_sum(g1, x)
    torch.cuda.current_stream().synchronize()
138
139
140

    stream = torch.cuda.Stream()
    results2 = torch.zeros_like(result)
141

142
143
144
145
146
147
148
149
150
151
152
153
154
    # Performs the computing in a background stream
    def perform_computing():
        with torch.cuda.stream(stream):
            g2 = g.to(F.ctx())
        torch.cuda.current_stream().wait_stream(stream)
        g2.record_stream(torch.cuda.current_stream())
        torch.cuda._sleep(int(50 * cycles_per_ms))  # delay the computing
        results2.copy_(OPS.copy_u_sum(g2, x))

    perform_computing()
    with torch.cuda.stream(stream):
        # since we have called record stream for g2, g3 won't reuse its memory
        g3 = rand_graph(10, 20, device=F.ctx())
155
        g3.create_formats_()
156
157
158
    torch.cuda.current_stream().synchronize()
    assert torch.equal(result, results2)

159
160
161
162

@unittest.skipIf(
    F._default_context_str == "cpu", reason="stream only runs on GPU."
)
163
164
165
166
def test_record_stream_graph_negative():
    cycles_per_ms = _get_cycles_per_ms()

    g = rand_graph(10, 20, device=F.cpu())
167
168
169
170
171
172
    g.create_formats_()
    x = torch.ones(g.num_nodes(), 10).to(F.ctx())
    g1 = g.to(F.ctx())
    # this is necessary to initialize the cusparse handle
    result = OPS.copy_u_sum(g1, x)
    torch.cuda.current_stream().synchronize()
173
174
175

    stream = torch.cuda.Stream()
    results2 = torch.zeros_like(result)
176

177
178
179
180
181
182
183
184
185
    # Performs the computing in a background stream
    def perform_computing():
        with torch.cuda.stream(stream):
            g2 = g.to(F.ctx())
        torch.cuda.current_stream().wait_stream(stream)
        # omit record_stream will produce a wrong result
        # g2.record_stream(torch.cuda.current_stream())
        torch.cuda._sleep(int(50 * cycles_per_ms))  # delay the computing
        results2.copy_(OPS.copy_u_sum(g2, x))
186

187
188
189
190
    perform_computing()
    with torch.cuda.stream(stream):
        # g3 will reuse g2's memory block, resulting a wrong result
        g3 = rand_graph(10, 20, device=F.ctx())
191
        g3.create_formats_()
192
193
    torch.cuda.current_stream().synchronize()
    assert not torch.equal(result, results2)
194

195
196

if __name__ == "__main__":
197
    test_basics()
198
199
200
201
    test_set_get_stream()
    test_record_stream_ndarray()
    test_record_stream_graph_positive()
    test_record_stream_graph_negative()