test_sessionize.py 4.23 KB
Newer Older
dugupeiwen's avatar
dugupeiwen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import unittest

from numba.cuda.testing import (CUDATestCase, skip_if_cudadevrt_missing,
                                skip_on_cudasim, skip_unless_cc_60,
                                skip_if_mvc_enabled)
from numba.tests.support import captured_stdout


@skip_if_cudadevrt_missing
@skip_unless_cc_60
@skip_if_mvc_enabled('CG not supported with MVC')
@skip_on_cudasim("cudasim doesn't support cuda import at non-top-level")
class TestSessionization(CUDATestCase):
    """
    Test click stream sessionization
    """

    def setUp(self):
        # Prevent output from this test showing up when running the test suite
        self._captured_stdout = captured_stdout()
        self._captured_stdout.__enter__()
        super().setUp()

    def tearDown(self):
        # No exception type, value, or traceback
        self._captured_stdout.__exit__(None, None, None)
        super().tearDown()

    def test_ex_sessionize(self):
        # ex_sessionize.import.begin
        import numpy as np
        from numba import cuda

        # Set the timeout to one hour
        session_timeout = np.int64(np.timedelta64("3600", "s"))
        # ex_sessionize.import.end

        # ex_sessionize.allocate.begin
        # Generate data
        ids = cuda.to_device(
            np.array(
                [
                    1, 1, 1, 1, 1, 1,
                    2, 2, 2,
                    3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
                    4, 4, 4, 4, 4, 4, 4, 4, 4,
                ]
            )
        )
        sec = cuda.to_device(
            np.array(
                [
                    1, 2, 3, 5000, 5001, 5002, 1,
                    2, 3, 1, 2, 5000, 5001, 10000,
                    10001, 10002, 10003, 15000, 150001,
                    1, 5000, 50001, 15000, 20000,
                    25000, 25001, 25002, 25003,
                ],
                dtype="datetime64[ns]",
            ).astype(
                "int64"
            )  # Cast to int64 for compatibility
        )
        # Create a vector to hold the results
        results = cuda.to_device(np.zeros(len(ids)))
        # ex_sessionize.allocate.end

        # ex_sessionize.kernel.begin
        @cuda.jit
        def sessionize(user_id, timestamp, results):
            gid = cuda.grid(1)
            size = len(user_id)

            if gid >= size:
                return

            # Determine session boundaries
            is_first_datapoint = gid == 0
            if not is_first_datapoint:
                new_user = user_id[gid] != user_id[gid - 1]
                timed_out = (
                    timestamp[gid] - timestamp[gid - 1] > session_timeout
                )
                is_sess_boundary = new_user or timed_out
            else:
                is_sess_boundary = True

            # Determine session labels
            if is_sess_boundary:
                # This thread marks the start of a session
                results[gid] = gid

                # Make sure all session boundaries are written
                # before populating the session id
                grid = cuda.cg.this_grid()
                grid.sync()

                look_ahead = 1
                # Check elements 'forward' of this one
                # until a new session boundary is found
                while results[gid + look_ahead] == 0:
                    results[gid + look_ahead] = gid
                    look_ahead += 1
                    # Avoid out-of-bounds accesses by the last thread
                    if gid + look_ahead == size - 1:
                        results[gid + look_ahead] = gid
                        break
        # ex_sessionize.kernel.end

        # ex_sessionize.launch.begin
        sessionize.forall(len(ids))(ids, sec, results)

        print(results.copy_to_host())
        # array([ 0.,  0.,  0.,  3.,  3.,  3.,
        #         6.,  6.,  6.,  9.,  9., 11.,
        #         11., 13., 13., 13., 13., 17.,
        #         18., 19., 20., 21., 21., 23.,
        #         24., 24., 24., 24.])
        # ex_sessionize.launch.end

        expect = [
            0, 0, 0, 3, 3, 3, 6, 6, 6, 9, 9,
            11, 11, 13, 13, 13, 13, 17, 18, 19, 20, 21,
            21, 23, 24, 24, 24, 24
        ]
        np.testing.assert_equal(expect, results.copy_to_host())


if __name__ == "__main__":
    unittest.main()