test_stream.py 8.71 KB
Newer Older
root's avatar
root committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import gc
import threading
import unittest

import pytest

import cupy
from cupy._creation import from_data
from cupy import cuda
from cupy import testing


@testing.parameterize(
    *testing.product({
        'stream_name': ['null', 'ptds'],
    }))
class TestStream(unittest.TestCase):

    def setUp(self):
        if cuda.runtime.is_hip and self.stream_name == 'ptds':
            self.skipTest('HIP does not support PTDS')

        self._prev_stream = cuda.get_current_stream()

        if self.stream_name == 'null':
            self.stream = cuda.Stream.null
        elif self.stream_name == 'ptds':
            self.stream = cuda.Stream.ptds
        self.stream.use()

    def tearDown(self):
        self._prev_stream.use()

    @unittest.skipIf(cuda.runtime.is_hip, 'This test is only for CUDA')
    def test_eq_cuda(self):
        null0 = self.stream
        if self.stream == cuda.Stream.null:
            null1 = cuda.Stream(True)
            null2 = cuda.Stream(True)
            null3 = cuda.Stream(ptds=True)
        else:
            null1 = cuda.Stream(ptds=True)
            null2 = cuda.Stream(ptds=True)
            null3 = cuda.Stream(True)
        null4 = cuda.Stream()

        assert null0 == null1
        assert null1 == null2
        assert null2 != null3
        assert null2 != null4

    @unittest.skipIf(not cuda.runtime.is_hip, 'This test is only for HIP')
    def test_eq_hip(self):
        null0 = self.stream
        null1 = cuda.Stream(True)
        null2 = cuda.Stream(True)
        null3 = cuda.Stream()

        assert null0 == null1
        assert null1 == null2
        assert null2 != null3

    def test_hash(self):
        hash(self.stream)
        hash(cuda.Stream(True))
        hash(cuda.Stream(False))
        mapping = {cuda.Stream(): 1, cuda.Stream(): 2}  # noqa

    def check_del(self, null, ptds):
        stream = cuda.Stream(null=null, ptds=ptds).use()
        assert stream is cuda.get_current_stream()
        stream_ptr = stream.ptr
        x = from_data.array([1, 2, 3])
        del stream
        assert stream_ptr == cuda.get_current_stream().ptr
        cuda.Stream.null.use()
        assert cuda.Stream.null is cuda.get_current_stream()
        # Want to test cudaStreamDestory is issued, but
        # runtime.streamQuery(stream_ptr) causes SEGV. We cannot test...
        del x

    def test_del_default(self):
        self.check_del(null=False, ptds=False)

    def test_del(self):
        null = self.stream == cuda.Stream.null
        if cuda.runtime.is_hip:
            ptds = False
        else:
            ptds = self.stream == cuda.Stream.ptds

        self.check_del(null=null, ptds=ptds)

    def test_get_and_add_callback(self):
        N = 100
        cupy_arrays = [testing.shaped_random((2, 3)) for _ in range(N)]

        if not cuda.runtime.is_hip:
            stream = self.stream
        else:
            # adding callbacks to the null stream in HIP would segfault...
            stream = cuda.Stream()

        out = []
        stream_list = []

        def _callback(s, _, t):
            out.append(t[0])
            stream_list.append(s.ptr)

        for i in range(N):
            numpy_array = cupy_arrays[i].get(stream=stream)
            stream.add_callback(
                _callback,
                (i, numpy_array))

        stream.synchronize()
        assert out == list(range(N))
        assert all(s == stream.ptr for s in stream_list)

    @unittest.skipIf(cuda.runtime.is_hip,
                     'HIP does not support launch_host_func')
    def test_launch_host_func(self):
        N = 100
        cupy_arrays = [testing.shaped_random((2, 3)) for _ in range(N)]

        stream = cuda.Stream.null

        out = []
        for i in range(N):
            numpy_array = cupy_arrays[i].get(stream=stream)
            stream.launch_host_func(
                lambda t: out.append(t[0]), (i, numpy_array))

        stream.synchronize()
        assert out == list(range(N))

    def test_with_statement(self):
        stream1 = cuda.Stream()
        stream2 = cuda.Stream()
        assert self.stream == cuda.get_current_stream()
        with stream1:
            assert stream1 == cuda.get_current_stream()
            with stream2:
                assert stream2 == cuda.get_current_stream()
            assert stream1 == cuda.get_current_stream()
        # self.stream is "forgotten"!
        assert cuda.Stream.null == cuda.get_current_stream()

    def test_use(self):
        stream1 = cuda.Stream().use()
        assert stream1 == cuda.get_current_stream()
        self.stream.use()
        assert self.stream == cuda.get_current_stream()

    @testing.multi_gpu(2)
    def test_per_device(self):
        with cuda.Device(0):
            stream0 = cuda.Stream()
            with stream0:
                assert stream0 == cuda.get_current_stream()
                with cuda.Device(1):
                    assert stream0 != cuda.get_current_stream()
                    assert cuda.Stream.null == cuda.get_current_stream()
                assert stream0 == cuda.get_current_stream()

    @testing.multi_gpu(2)
    def test_per_device_failure(self):
        with cuda.Device(0):
            stream0 = cuda.Stream()
        with cuda.Device(1):
            with pytest.raises(RuntimeError):
                with stream0:
                    pass
            with pytest.raises(RuntimeError):
                stream0.use()

    def test_mix_use_context(self):
        # See cupy/cupy#5143
        s1 = cuda.Stream()
        s2 = cuda.Stream()
        s3 = cuda.Stream()
        assert cuda.get_current_stream() == self.stream
        with s1:
            assert cuda.get_current_stream() == s1
            s2.use()
            assert cuda.get_current_stream() == s2
            with s3:
                assert cuda.get_current_stream() == s3
                del s2
            assert cuda.get_current_stream() == s1
        # self.stream is "forgotten"!
        assert cuda.get_current_stream() == cuda.Stream.null

    def test_stream_thread(self):
        s1 = None

        def f1(barrier, errors):
            global s1
            tid = barrier.wait()
            try:
                s1 = cuda.Stream()
                barrier.wait()  # until t2 starts
                s1.use()
                barrier.wait()  # until t2 uses the stream
                s1 = None
                gc.collect()
                barrier.wait()  # until t2 decrefs the stream
                assert cuda.get_current_stream() is not None
                cupy.arange(10)
                errors[tid] = False
            except Exception as e:
                print(f'error in {tid}: {e}')

        def f2(barrier, errors):
            global s1
            tid = barrier.wait()
            try:
                barrier.wait()  # until t1 creates the stream
                s1.use()
                barrier.wait()  # until t1 uses the stream
                s1 = None
                gc.collect()
                barrier.wait()  # until t1 decrefs the stream
                assert cuda.get_current_stream() is not None
                cupy.arange(10)
                errors[tid] = False
            except Exception as e:
                print(f'error in {tid}: {e}')

        barrier = threading.Barrier(2)
        errors = [True, True]
        threads = [
            threading.Thread(target=f1, args=(barrier, errors), daemon=True),
            threading.Thread(target=f2, args=(barrier, errors), daemon=True),
        ]
        del s1
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        for err in errors:
            assert err is False


class TestExternalStream(unittest.TestCase):

    def setUp(self):
        self.stream_ptr = cuda.runtime.streamCreate()
        self.stream = cuda.ExternalStream(self.stream_ptr)

    def tearDown(self):
        cuda.runtime.streamDestroy(self.stream_ptr)

    def test_get_and_add_callback(self):
        N = 100
        cupy_arrays = [testing.shaped_random((2, 3)) for _ in range(N)]

        stream = self.stream

        out = []
        for i in range(N):
            numpy_array = cupy_arrays[i].get(stream=stream)
            stream.add_callback(
                lambda _, __, t: out.append(t[0]),
                (i, numpy_array))

        stream.synchronize()
        assert out == list(range(N))

    @unittest.skipIf(cuda.runtime.is_hip,
                     'HIP does not support launch_host_func')
    def test_launch_host_func(self):
        N = 100
        cupy_arrays = [testing.shaped_random((2, 3)) for _ in range(N)]

        stream = self.stream

        out = []
        for i in range(N):
            numpy_array = cupy_arrays[i].get(stream=stream)
            stream.launch_host_func(
                lambda t: out.append(t[0]), (i, numpy_array))

        stream.synchronize()
        assert out == list(range(N))