test_streams.py 4.1 KB
Newer Older
dugupeiwen's avatar
dugupeiwen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
import functools
import threading
import numpy as np
from numba import cuda
from numba.cuda.testing import unittest, CUDATestCase, skip_on_cudasim


def with_asyncio_loop(f):
    @functools.wraps(f)
    def runner(*args, **kwds):
        loop = asyncio.new_event_loop()
        loop.set_debug(True)
        try:
            return loop.run_until_complete(f(*args, **kwds))
        finally:
            loop.close()
    return runner


@skip_on_cudasim('CUDA Driver API unsupported in the simulator')
class TestCudaStream(CUDATestCase):
    def test_add_callback(self):
        def callback(stream, status, event):
            event.set()

        stream = cuda.stream()
        callback_event = threading.Event()
        stream.add_callback(callback, callback_event)
        self.assertTrue(callback_event.wait(1.0))

    def test_add_callback_with_default_arg(self):
        callback_event = threading.Event()

        def callback(stream, status, arg):
            self.assertIsNone(arg)
            callback_event.set()

        stream = cuda.stream()
        stream.add_callback(callback)
        self.assertTrue(callback_event.wait(1.0))

    @with_asyncio_loop
    async def test_async_done(self):
        stream = cuda.stream()
        await stream.async_done()

    @with_asyncio_loop
    async def test_parallel_tasks(self):
        async def async_cuda_fn(value_in: float) -> float:
            stream = cuda.stream()
            h_src, h_dst = cuda.pinned_array(8), cuda.pinned_array(8)
            h_src[:] = value_in
            d_ary = cuda.to_device(h_src, stream=stream)
            d_ary.copy_to_host(h_dst, stream=stream)
            done_result = await stream.async_done()
            self.assertEqual(done_result, stream)
            return h_dst.mean()

        values_in = [1, 2, 3, 4]
        tasks = [asyncio.create_task(async_cuda_fn(v)) for v in values_in]
        values_out = await asyncio.gather(*tasks)
        self.assertTrue(np.allclose(values_in, values_out))

    @with_asyncio_loop
    async def test_multiple_async_done(self):
        stream = cuda.stream()
        done_aws = [stream.async_done() for _ in range(4)]
        done = await asyncio.gather(*done_aws)
        for d in done:
            self.assertEqual(d, stream)

    @with_asyncio_loop
    async def test_multiple_async_done_multiple_streams(self):
        streams = [cuda.stream() for _ in range(4)]
        done_aws = [stream.async_done() for stream in streams]
        done = await asyncio.gather(*done_aws)

        # Ensure we got the four original streams in done
        self.assertSetEqual(set(done), set(streams))

    @with_asyncio_loop
    async def test_cancelled_future(self):
        stream = cuda.stream()
        done1, done2 = stream.async_done(), stream.async_done()
        done1.cancel()
        await done2
        self.assertTrue(done1.cancelled())
        self.assertTrue(done2.done())


@skip_on_cudasim('CUDA Driver API unsupported in the simulator')
class TestFailingStream(CUDATestCase):
    # This test can only be run in isolation because it corrupts the CUDA
    # context, which cannot be recovered from within the same process. It is
    # left here so that it can be run manually for debugging / testing purposes
    # - or may be re-enabled if in future there is infrastructure added for
    # running tests in a separate process (a subprocess cannot be used because
    # CUDA will have been initialized before the fork, so it cannot be used in
    # the child process).
    @unittest.skip
    @with_asyncio_loop
    async def test_failed_stream(self):
        ctx = cuda.current_context()
        module = ctx.create_module_ptx("""
            .version 6.5
            .target sm_30
            .address_size 64
            .visible .entry failing_kernel() { trap; }
        """)
        failing_kernel = module.get_function("failing_kernel")

        stream = cuda.stream()
        failing_kernel.configure((1,), (1,), stream=stream).__call__()
        done = stream.async_done()
        with self.assertRaises(Exception):
            await done
        self.assertIsNotNone(done.exception())


if __name__ == '__main__':
    unittest.main()