Commit 5c70ef66 authored by dugupeiwen's avatar dugupeiwen
Browse files

update roc

parent 1fb0017a
from numba.testing import SerialSuite
from numba.testing import load_testsuite
import os
def load_tests(loader, tests, pattern):
return SerialSuite(load_testsuite(loader, os.path.dirname(__file__)))
import math
import numpy as np
import numba
def run_far_jump():
gt_as_str = 'float32'
R_EARTH = 6371.0 # km
@numba.roc.jit(device=True)
def deg2rad(deg):
return math.pi * deg / 180.0
sig = '%s(%s, %s, %s, %s)' % ((gt_as_str,) * 5)
@numba.vectorize(sig, target='roc')
def gpu_great_circle_distance(lat1, lng1, lat2, lng2):
'''Return the great-circle distance in km between (lat1, lng1) and (lat2, lng2)
on the surface of the Earth.'''
lat1, lng1 = deg2rad(lat1), deg2rad(lng1)
lat2, lng2 = deg2rad(lat2), deg2rad(lng2)
sin_lat1, cos_lat1 = math.sin(lat1), math.cos(lat1)
sin_lat2, cos_lat2 = math.sin(lat2), math.cos(lat2)
delta = lng1 - lng2
sin_delta, cos_delta = math.sin(delta), math.cos(delta)
numerator = math.sqrt((cos_lat1 * sin_delta)**2 +
(cos_lat1 * sin_lat2 - sin_lat1 * cos_lat2 * cos_delta)**2)
denominator = sin_lat1 * sin_lat2 + cos_lat1 * cos_lat2 * cos_delta
return R_EARTH * math.atan2(numerator, denominator)
arr = np.random.random(10).astype(np.float32)
gpu_great_circle_distance(arr, arr, arr, arr)
if __name__ == '__main__':
run_far_jump()
"""
Test async kernel copy
"""
import logging
import numpy as np
from numba import roc
import unittest
from numba.roc.hsadrv.driver import dgpu_present
logger = logging.getLogger()
@unittest.skipUnless(dgpu_present, 'test only on dGPU system')
class TestAsyncKernel(unittest.TestCase):
def test_manual_stream(self):
logger.info('context info: %s', roc.get_context().agent)
@roc.jit("int32[:], int32[:]")
def add1_kernel(dst, src):
i = roc.get_global_id(0)
if i < dst.size:
dst[i] = src[i] + 1
blksz = 256
gridsz = 10**5
nitems = blksz * gridsz
ntimes = 500
arr = np.arange(nitems, dtype=np.int32)
logger.info('make coarse_arr')
coarse_arr = roc.coarsegrain_array(shape=arr.shape, dtype=arr.dtype)
coarse_arr[:] = arr
logger.info('make coarse_res_arr')
coarse_res_arr = roc.coarsegrain_array(shape=arr.shape, dtype=arr.dtype)
coarse_res_arr[:] = 0
logger.info("make stream")
stream = roc.stream()
logger.info('make gpu_res_arr')
gpu_res_arr = roc.device_array_like(coarse_arr)
logger.info('make gpu_arr')
gpu_arr = roc.to_device(coarse_arr, stream=stream)
for i in range(ntimes):
logger.info('launch kernel: %d', i)
add1_kernel[gridsz, blksz, stream](gpu_res_arr, gpu_arr)
gpu_arr.copy_to_device(gpu_res_arr, stream=stream)
logger.info('get kernel result')
gpu_res_arr.copy_to_host(coarse_res_arr, stream=stream)
logger.info("synchronize")
stream.synchronize()
logger.info("compare result")
np.testing.assert_equal(coarse_res_arr, coarse_arr + ntimes)
def test_ctx_managed_stream(self):
logger.info('context info: %s', roc.get_context().agent)
@roc.jit("int32[:], int32[:]")
def add1_kernel(dst, src):
i = roc.get_global_id(0)
if i < dst.size:
dst[i] = src[i] + 1
blksz = 256
gridsz = 10**5
nitems = blksz * gridsz
ntimes = 500
arr = np.arange(nitems, dtype=np.int32)
logger.info('make coarse_arr')
coarse_arr = roc.coarsegrain_array(shape=arr.shape, dtype=arr.dtype)
coarse_arr[:] = arr
logger.info('make coarse_res_arr')
coarse_res_arr = roc.coarsegrain_array(shape=arr.shape, dtype=arr.dtype)
coarse_res_arr[:] = 0
logger.info("make stream")
stream = roc.stream()
with stream.auto_synchronize():
logger.info('make gpu_res_arr')
gpu_res_arr = roc.device_array_like(coarse_arr)
logger.info('make gpu_arr')
gpu_arr = roc.to_device(coarse_arr, stream=stream)
for i in range(ntimes):
logger.info('launch kernel: %d', i)
add1_kernel[gridsz, blksz, stream](gpu_res_arr, gpu_arr)
gpu_arr.copy_to_device(gpu_res_arr, stream=stream)
logger.info('get kernel result')
gpu_res_arr.copy_to_host(coarse_res_arr, stream=stream)
logger.info("synchronize on ctx __exit__")
logger.info("compare result")
np.testing.assert_equal(coarse_res_arr, coarse_arr + ntimes)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
unittest.main()
import numpy as np
import numba
from numba import roc
import unittest
def atomic_add(ary):
tid = roc.get_local_id(0)
sm = roc.shared.array(32, numba.uint32)
sm[tid] = 0
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
bin = ary[tid] % 32
roc.atomic.add(sm, bin, 1)
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
ary[tid] = sm[tid]
def atomic_add2(ary):
tx = roc.get_local_id(0)
ty = roc.get_local_id(1)
sm = roc.shared.array((4, 8), numba.uint32)
sm[tx, ty] = ary[tx, ty]
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
roc.atomic.add(sm, (tx, ty), 1)
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
ary[tx, ty] = sm[tx, ty]
def atomic_add3(ary):
tx = roc.get_local_id(0)
ty = roc.get_local_id(1)
sm = roc.shared.array((4, 8), numba.uint32)
sm[tx, ty] = ary[tx, ty]
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
roc.atomic.add(sm, (tx, numba.uint64(ty)), 1)
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
ary[tx, ty] = sm[tx, ty]
class TestHsaAtomics(unittest.TestCase):
def test_atomic_add(self):
ary = np.random.randint(0, 32, size=32).astype(np.uint32)
orig = ary.copy()
hsa_atomic_add = roc.jit('void(uint32[:])')(atomic_add)
hsa_atomic_add[1, 32](ary)
gold = np.zeros(32, dtype=np.uint32)
for i in range(orig.size):
gold[orig[i]] += 1
self.assertTrue(np.all(ary == gold))
def test_atomic_add2(self):
ary = np.random.randint(0, 32, size=32).astype(np.uint32).reshape(4, 8)
orig = ary.copy()
hsa_atomic_add2 = roc.jit('void(uint32[:,:])')(atomic_add2)
hsa_atomic_add2[1, (4, 8)](ary)
self.assertTrue(np.all(ary == orig + 1))
def test_atomic_add3(self):
ary = np.random.randint(0, 32, size=32).astype(np.uint32).reshape(4, 8)
orig = ary.copy()
hsa_atomic_add3 = roc.jit('void(uint32[:,:])')(atomic_add3)
hsa_atomic_add3[1, (4, 8)](ary)
self.assertTrue(np.all(ary == orig + 1))
if __name__ == '__main__':
unittest.main()
import numpy as np
import unittest
from numba import roc
def copy_kernel(out, inp):
i = roc.get_global_id(0)
if i < out.size:
out[i] = inp[i]
class TestAutoJit(unittest.TestCase):
def test_autojit_kernel(self):
kernel = roc.jit(copy_kernel)
inp = np.arange(10)
out = np.zeros_like(inp)
kernel.forall(out.size)(out, inp)
np.testing.assert_equal(inp, out)
def test_autojit_device(self):
@roc.jit(device=True)
def inner(a, b):
return a + b
@roc.jit
def outer(A, B):
i = roc.get_global_id(0)
if i < A.size:
A[i] = inner(A[i], B[i])
A = np.arange(10)
Aorig = A.copy()
B = np.arange(10)
outer.forall(A.size)(A, B)
self.assertFalse(np.all(Aorig == A))
np.testing.assert_equal(Aorig + B, A)
if __name__ == '__main__':
unittest.main()
import numpy as np
from numba import roc, float32
import unittest
class TestBarrier(unittest.TestCase):
def test_proper_lowering(self):
@roc.jit("void(float32[::1])")
def twice(A):
i = roc.get_global_id(0)
d = A[i]
roc.barrier(roc.CLK_LOCAL_MEM_FENCE) # local mem fence
A[i] = d * 2
N = 256
arr = np.random.random(N).astype(np.float32)
orig = arr.copy()
twice[2, 128](arr)
# Assembly contains barrier instruction?
self.assertIn("s_barrier", twice.assembly)
# The computation is correct?
np.testing.assert_allclose(orig * 2, arr)
def test_no_arg_barrier_support(self):
@roc.jit("void(float32[::1])")
def twice(A):
i = roc.get_global_id(0)
d = A[i]
# no argument defaults to global mem fence
# which is the same for local in hsail
roc.barrier()
A[i] = d * 2
N = 256
arr = np.random.random(N).astype(np.float32)
orig = arr.copy()
twice[2, 128](arr)
# Assembly contains barrier instruction?
self.assertIn("s_barrier", twice.assembly)
# The computation is correct?
np.testing.assert_allclose(orig * 2, arr)
def test_local_memory(self):
blocksize = 10
@roc.jit("void(float32[::1])")
def reverse_array(A):
sm = roc.shared.array(shape=blocksize, dtype=float32)
i = roc.get_global_id(0)
# preload
sm[i] = A[i]
# barrier
roc.barrier(roc.CLK_LOCAL_MEM_FENCE) # local mem fence
# write
A[i] += sm[blocksize - 1 - i]
arr = np.arange(blocksize).astype(np.float32)
orig = arr.copy()
reverse_array[1, blocksize](arr)
expected = orig[::-1] + orig
np.testing.assert_allclose(expected, arr)
if __name__ == '__main__':
unittest.main()
import tempfile
import os
import numpy as np
import unittest
from numba import roc
from numba.core import types
from numba.roc import compiler
from numba.roc.hsadrv.driver import hsa as hsart
from numba.roc.hsadrv.driver import BrigModule, Executable, Program
def copy_kernel(out, inp):
out[0] = inp[0]
def copy_kernel_1d(out, inp):
i = roc.get_global_id(0)
if i < out.size:
out[i] = inp[i]
def assign_value(out, inp):
i = roc.get_global_id(0)
if i < out.size:
out[i] = inp
class TestCodeGeneration(unittest.TestCase):
def test_copy_kernel(self):
arytype = types.float32[:]
kernel = compiler.compile_kernel(copy_kernel, [arytype] * 2)
self.assertIn(".globl\t{0}".format(kernel.entry_name),
kernel.assembly)
def test_copy_kernel_1d(self):
arytype = types.float32[:]
kernel = compiler.compile_kernel(copy_kernel_1d, [arytype] * 2)
self.assertIn(".globl\t{0}".format(kernel.entry_name),
kernel.assembly)
class _TestBase(unittest.TestCase):
def setUp(self):
self.gpu = [a for a in hsart.agents if a.is_component][0]
self.cpu = [a for a in hsart.agents if not a.is_component][0]
self.queue = self.gpu.create_queue_multi(self.gpu.queue_max_size)
def tearDown(self):
del self.queue
del self.gpu
del self.cpu
class TestExecution(unittest.TestCase):
def test_hsa_kernel(self):
src = np.arange(1024, dtype=np.float32)
dst = np.zeros_like(src)
# Compiler kernel
arytype = types.float32[::1]
kernel = compiler.compile_kernel(copy_kernel_1d, [arytype] * 2)
# Run kernel
kernel[src.size // 256, 256](dst, src)
np.testing.assert_equal(src, dst)
class TestKernelArgument(unittest.TestCase):
def _test_template(self, nbtype, src):
dtype = np.dtype(str(nbtype))
dst = np.zeros(1, dtype=dtype)
src = dtype.type(src)
arytype = nbtype[::1]
kernel = compiler.compile_kernel(assign_value, [arytype, nbtype])
kernel[1, 1](dst, src)
self.assertEqual(dst[0], src)
def test_float64(self):
self._test_template(nbtype=types.float64, src=1. / 3.)
def test_float32(self):
self._test_template(nbtype=types.float32, src=1. / 3.)
def test_int32(self):
self._test_template(nbtype=types.int32, src=123)
def test_int16(self):
self._test_template(nbtype=types.int16, src=123)
def test_complex64(self):
self._test_template(nbtype=types.complex64, src=12 + 34j)
def test_complex128(self):
self._test_template(nbtype=types.complex128, src=12 + 34j)
def udt_devfunc(a, i):
return a[i]
class TestDeviceFunction(unittest.TestCase):
def test_device_function(self):
src = np.arange(10, dtype=np.int32)
dst = np.zeros_like(src)
arytype = types.int32[::1]
devfn = compiler.compile_device(udt_devfunc, arytype.dtype,
[arytype, types.intp])
def udt_devfunc_caller(dst, src):
i = roc.get_global_id(0)
if i < dst.size:
dst[i] = devfn(src, i)
kernel = compiler.compile_kernel(udt_devfunc_caller,
[arytype, arytype])
kernel[src.size, 1](dst, src)
np.testing.assert_equal(dst, src)
if __name__ == '__main__':
unittest.main()
import numpy as np
import unittest
from numba import roc
class TestDecorators(unittest.TestCase):
def test_kernel_jit(self):
@roc.jit("(float32[:], float32[:])")
def copy_vector(dst, src):
tid = roc.get_global_id(0)
if tid < dst.size:
dst[tid] = src[tid]
src = np.arange(10, dtype=np.uint32)
dst = np.zeros_like(src)
copy_vector[10, 1](dst, src)
np.testing.assert_equal(dst, src)
def test_device_jit(self):
@roc.jit("float32(float32[:], intp)", device=True)
def inner(src, idx):
return src[idx]
@roc.jit("(float32[:], float32[:])")
def outer(dst, src):
tid = roc.get_global_id(0)
if tid < dst.size:
dst[tid] = inner(src, tid)
src = np.arange(10, dtype=np.uint32)
dst = np.zeros_like(src)
outer[10, 1](dst, src)
np.testing.assert_equal(dst, src)
def test_autojit_kernel(self):
@roc.jit
def copy_vector(dst, src):
tid = roc.get_global_id(0)
if tid < dst.size:
dst[tid] = src[tid]
for dtype in [np.uint32, np.float32]:
src = np.arange(10, dtype=dtype)
dst = np.zeros_like(src)
copy_vector[10, 1](dst, src)
np.testing.assert_equal(dst, src)
if __name__ == '__main__':
unittest.main()
import numpy as np
from numba.roc.vectorizers import HsaGUFuncVectorize
from numba.roc.dispatch import HSAGenerializedUFunc
from numba import guvectorize
import unittest
def ufunc_add_core(a, b, c):
for i in range(c.size):
c[i] = a[i] + b[i]
class TestGUFuncBuilding(unittest.TestCase):
def test_gufunc_building(self):
ufbldr = HsaGUFuncVectorize(ufunc_add_core, "(x),(x)->(x)")
ufbldr.add("(float32[:], float32[:], float32[:])")
ufbldr.add("(intp[:], intp[:], intp[:])")
ufunc = ufbldr.build_ufunc()
self.assertIsInstance(ufunc, HSAGenerializedUFunc)
# Test integer version
A = np.arange(100, dtype=np.intp)
B = np.arange(100, dtype=np.intp) + 1
expected = A + B
got = ufunc(A, B)
np.testing.assert_equal(expected, got)
self.assertEqual(expected.dtype, got.dtype)
self.assertEqual(np.dtype(np.intp), got.dtype)
# Test integer version with 2D inputs
A = A.reshape(50, 2)
B = B.reshape(50, 2)
expected = A + B
got = ufunc(A, B)
np.testing.assert_equal(expected, got)
self.assertEqual(expected.dtype, got.dtype)
self.assertEqual(np.dtype(np.intp), got.dtype)
# Test integer version with 3D inputs
A = A.reshape(5, 10, 2)
B = B.reshape(5, 10, 2)
expected = A + B
got = ufunc(A, B)
np.testing.assert_equal(expected, got)
self.assertEqual(expected.dtype, got.dtype)
self.assertEqual(np.dtype(np.intp), got.dtype)
# Test real version
A = np.arange(100, dtype=np.float32)
B = np.arange(100, dtype=np.float32) + 1
expected = A + B
got = ufunc(A, B)
np.testing.assert_allclose(expected, got)
self.assertEqual(expected.dtype, got.dtype)
self.assertEqual(np.dtype(np.float32), got.dtype)
# Test real version with 2D inputs
A = A.reshape(50, 2)
B = B.reshape(50, 2)
expected = A + B
got = ufunc(A, B)
np.testing.assert_allclose(expected, got)
self.assertEqual(expected.dtype, got.dtype)
self.assertEqual(np.dtype(np.float32), got.dtype)
def test_gufunc_building_scalar_output(self):
def sum_row(inp, out):
tmp = 0.
for i in range(inp.shape[0]):
tmp += inp[i]
out[0] = tmp
ufbldr = HsaGUFuncVectorize(sum_row, "(n)->()")
ufbldr.add("void(int32[:], int32[:])")
ufunc = ufbldr.build_ufunc()
inp = np.arange(300, dtype=np.int32).reshape(100, 3)
out = ufunc(inp)
for i in range(inp.shape[0]):
np.testing.assert_equal(inp[i].sum(), out[i])
def test_gufunc_scalar_input_saxpy(self):
def axpy(a, x, y, out):
for i in range(out.shape[0]):
out[i] = a * x[i] + y[i]
ufbldr = HsaGUFuncVectorize(axpy, '(),(t),(t)->(t)')
ufbldr.add("void(float32, float32[:], float32[:], float32[:])")
saxpy = ufbldr.build_ufunc()
A = np.float32(2)
X = np.arange(10, dtype=np.float32).reshape(5, 2)
Y = np.arange(10, dtype=np.float32).reshape(5, 2)
out = saxpy(A, X, Y)
for j in range(5):
for i in range(2):
exp = A * X[j, i] + Y[j, i]
self.assertTrue(exp == out[j, i])
X = np.arange(10, dtype=np.float32)
Y = np.arange(10, dtype=np.float32)
out = saxpy(A, X, Y)
for j in range(10):
exp = A * X[j] + Y[j]
self.assertTrue(exp == out[j], (exp, out[j]))
A = np.arange(5, dtype=np.float32)
X = np.arange(10, dtype=np.float32).reshape(5, 2)
Y = np.arange(10, dtype=np.float32).reshape(5, 2)
out = saxpy(A, X, Y)
for j in range(5):
for i in range(2):
exp = A[j] * X[j, i] + Y[j, i]
self.assertTrue(exp == out[j, i], (exp, out[j, i]))
class TestGUFuncDecor(unittest.TestCase):
def test_gufunc_decorator(self):
@guvectorize(["void(float32, float32[:], float32[:], float32[:])"],
'(),(t),(t)->(t)', target='roc')
def saxpy(a, x, y, out):
for i in range(out.shape[0]):
out[i] = a * x[i] + y[i]
A = np.float32(2)
X = np.arange(10, dtype=np.float32).reshape(5, 2)
Y = np.arange(10, dtype=np.float32).reshape(5, 2)
out = saxpy(A, X, Y)
for j in range(5):
for i in range(2):
exp = A * X[j, i] + Y[j, i]
self.assertTrue(exp == out[j, i])
X = np.arange(10, dtype=np.float32)
Y = np.arange(10, dtype=np.float32)
out = saxpy(A, X, Y)
for j in range(10):
exp = A * X[j] + Y[j]
self.assertTrue(exp == out[j], (exp, out[j]))
A = np.arange(5, dtype=np.float32)
X = np.arange(10, dtype=np.float32).reshape(5, 2)
Y = np.arange(10, dtype=np.float32).reshape(5, 2)
out = saxpy(A, X, Y)
for j in range(5):
for i in range(2):
exp = A[j] * X[j, i] + Y[j, i]
self.assertTrue(exp == out[j, i], (exp, out[j, i]))
if __name__ == '__main__':
unittest.main()
import numpy as np
from numba import roc
from numba.core.errors import TypingError
import operator as oper
import unittest
_WAVESIZE = roc.get_context().agent.wavefront_size
@roc.jit(device=True)
def shuffle_up(val, width):
tid = roc.get_local_id(0)
roc.wavebarrier()
idx = (tid + width) % _WAVESIZE
res = roc.ds_permute(idx, val)
return res
@roc.jit(device=True)
def shuffle_down(val, width):
tid = roc.get_local_id(0)
roc.wavebarrier()
idx = (tid - width) % _WAVESIZE
res = roc.ds_permute(idx, val)
return res
@roc.jit(device=True)
def broadcast(val, from_lane):
tid = roc.get_local_id(0)
roc.wavebarrier()
res = roc.ds_bpermute(from_lane, val)
return res
def gen_kernel(shuffunc):
@roc.jit
def kernel(inp, outp, amount):
tid = roc.get_local_id(0)
val = inp[tid]
outp[tid] = shuffunc(val, amount)
return kernel
class TestDsPermute(unittest.TestCase):
def test_ds_permute(self):
inp = np.arange(_WAVESIZE).astype(np.int32)
outp = np.zeros_like(inp)
for shuffler, op in [(shuffle_down, oper.neg), (shuffle_up, oper.pos)]:
kernel = gen_kernel(shuffler)
for shuf in range(-_WAVESIZE, _WAVESIZE):
kernel[1, _WAVESIZE](inp, outp, shuf)
np.testing.assert_allclose(outp, np.roll(inp, op(shuf)))
def test_ds_permute_random_floats(self):
inp = np.linspace(0, 1, _WAVESIZE).astype(np.float32)
outp = np.zeros_like(inp)
for shuffler, op in [(shuffle_down, oper.neg), (shuffle_up, oper.pos)]:
kernel = gen_kernel(shuffler)
for shuf in range(-_WAVESIZE, _WAVESIZE):
kernel[1, _WAVESIZE](inp, outp, shuf)
np.testing.assert_allclose(outp, np.roll(inp, op(shuf)))
def test_ds_permute_type_safety(self):
""" Checks that float64's are not being downcast to float32"""
kernel = gen_kernel(shuffle_down)
inp = np.linspace(0, 1, _WAVESIZE).astype(np.float64)
outp = np.zeros_like(inp)
with self.assertRaises(TypingError) as e:
kernel[1, _WAVESIZE](inp, outp, 1)
errmsg = e.exception.msg
self.assertIn('Invalid use of Function', errmsg)
self.assertIn('with argument(s) of type(s): (float64, int64)', errmsg)
def test_ds_bpermute(self):
@roc.jit
def kernel(inp, outp, lane):
tid = roc.get_local_id(0)
val = inp[tid]
outp[tid] = broadcast(val, lane)
inp = np.arange(_WAVESIZE).astype(np.int32)
outp = np.zeros_like(inp)
for lane in range(0, _WAVESIZE):
kernel[1, _WAVESIZE](inp, outp, lane)
np.testing.assert_allclose(outp, lane)
def test_ds_bpermute_random_floats(self):
@roc.jit
def kernel(inp, outp, lane):
tid = roc.get_local_id(0)
val = inp[tid]
outp[tid] = broadcast(val, lane)
inp = np.linspace(0, 1, _WAVESIZE).astype(np.float32)
outp = np.zeros_like(inp)
for lane in range(0, _WAVESIZE):
kernel[1, _WAVESIZE](inp, outp, lane)
np.testing.assert_allclose(outp, inp[lane])
if __name__ == '__main__':
unittest.main()
import sys
import os
import os.path
import subprocess
import math
import numba
import unittest
class TestLargeCode(unittest.TestCase):
def test_far_jump(self):
from numba.roc.tests.hsapy import run_far_branch
pyinterp = sys.executable
numba_dir = os.path.abspath(os.path.join(os.path.dirname(numba.__file__), os.pardir))
script, ext = os.path.splitext(os.path.relpath(run_far_branch.__file__, numba_dir))
script = script.replace(os.path.sep, '.')
args = [pyinterp, script]
cmd = '{} -m {}'.format(*args)
oldpp = os.environ.get('PYTHONPATH')
os.environ['PYTHONPATH'] = numba_dir
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
finally:
if oldpp is None:
del os.environ['PYTHONPATH']
else:
os.environ['PYTHONPATH'] = oldpp
if __name__ == '__main__':
unittest.main()
import unittest
from numba import roc
class TestLinkage(unittest.TestCase):
def test_indirection(self):
@roc.jit(device=True)
def base():
pass
@roc.jit(device=True)
def layer1():
base()
@roc.jit(device=True)
def layer2():
layer1()
base()
@roc.jit
def kernel(a):
layer2()
kernel[1, 1](1)
if __name__ == '__main__':
unittest.main()
import numpy as np
import math
import unittest
from numba import roc
from numba.core import utils
class TestMath(unittest.TestCase):
def _get_tol(self, math_fn, ty):
"""gets the tolerance for functions when the input is of type 'ty'"""
low_res = {
(math.gamma, np.float64): 1e-14,
(math.lgamma, np.float64): 1e-13,
(math.asin, np.float64): 1e-9,
(math.acos, np.float64): 4e-9,
(math.sqrt, np.float64): 2e-8,
}
default = 1e-15 if ty == np.float64 else 1e-6
return low_res.get((math_fn, ty), default)
def _generic_test_unary(self, math_fn, npy_fn,
cases=None,
span=(-1., 1.), count=128,
types=(np.float32, np.float64)):
@roc.jit
def fn(dst, src):
i = roc.get_global_id(0)
if i < dst.size:
dst[i] = math_fn(src[i])
for dtype in types:
if cases is None:
src = np.linspace(span[0], span[1], count).astype(dtype)
else:
src = np.array(cases, dtype=dtype)
dst = np.zeros_like(src)
fn[src.size, 1](dst, src)
np.testing.assert_allclose(dst, npy_fn(src),
rtol=self._get_tol(math_fn, dtype),
err_msg='{0} ({1})'.format(
math_fn.__name__,
dtype.__name__))
def _generic_test_binary(self, math_fn, npy_fn,
cases=None,
span=(-1., 1., 1., -1.), count=128,
types=(np.float32, np.float64)):
@roc.jit
def fn(dst, src1, src2):
i = roc.get_global_id(0)
if i < dst.size:
dst[i] = math_fn(src1[i], src2[i])
for dtype in types:
if cases is None:
src1 = np.linspace(span[0], span[1], count).astype(dtype)
src2 = np.linspace(span[2], span[3], count).astype(dtype)
else:
src1 = np.array(cases[0], dtype=dtype)
src2 = np.array(cases[1], dtype=dtype)
dst = np.zeros_like(src1)
fn[dst.size, 1](dst, src1, src2)
np.testing.assert_allclose(dst, npy_fn(src1, src2),
rtol=self._get_tol(math_fn, dtype),
err_msg='{0} ({1})'.format(
math_fn.__name__,
dtype.__name__))
def test_trig(self):
funcs = [math.sin, math.cos, math.tan]
for fn in funcs:
self._generic_test_unary(fn, getattr(np, fn.__name__),
span=(-np.pi, np.pi))
def test_trig_inv(self):
funcs = [(math.asin, np.arcsin),
(math.acos, np.arccos),
(math.atan, np.arctan)]
for fn, np_fn in funcs:
self._generic_test_unary(fn, np_fn)
def test_trigh(self):
funcs = [math.sinh, math.cosh, math.tanh]
for fn in funcs:
self._generic_test_unary(fn, getattr(np, fn.__name__),
span=(-4.0, 4.0))
def test_trigh_inv(self):
funcs = [(math.asinh, np.arcsinh, (-4, 4)),
(math.acosh, np.arccosh, (1, 9)),
(math.atanh, np.arctanh, (-0.9, 0.9))]
for fn, np_fn, span in funcs:
self._generic_test_unary(fn, np_fn, span=span)
def test_classify(self):
funcs = [math.isnan, math.isinf]
cases = (float('nan'), float('inf'), float('-inf'), float('-nan'),
0, 3, -2)
for fn in funcs:
self._generic_test_unary(fn, getattr(np, fn.__name__),
cases=cases)
def test_floor_ceil(self):
funcs = [math.ceil, math.floor]
for fn in funcs:
# cases with varied decimals
self._generic_test_unary(fn, getattr(np, fn.__name__),
span=(-1013.14, 843.21))
# cases that include "exact" integers
self._generic_test_unary(fn, getattr(np, fn.__name__),
span=(-16, 16), count=129)
def test_fabs(self):
funcs = [math.fabs]
for fn in funcs:
self._generic_test_unary(fn, getattr(np, fn.__name__),
span=(-63.3, 63.3))
def test_unary_exp(self):
funcs = [math.exp]
for fn in funcs:
self._generic_test_unary(fn, getattr(np, fn.__name__),
span=(-30, 30))
def test_unary_expm1(self):
funcs = [math.expm1]
for fn in funcs:
self._generic_test_unary(fn, getattr(np, fn.__name__),
span=(-30, 30))
def test_sqrt(self):
funcs = [math.sqrt]
for fn in funcs:
self._generic_test_unary(fn, getattr(np, fn.__name__),
span=(0, 1000))
def test_log(self):
funcs = [math.log, math.log10, math.log1p]
for fn in funcs:
self._generic_test_unary(fn, getattr(np, fn.__name__),
span=(0.1, 2500))
def test_binaries(self):
funcs = [math.copysign, math.fmod]
for fn in funcs:
self._generic_test_binary(fn, getattr(np, fn.__name__))
def test_pow(self):
funcs = [(math.pow, np.power)]
for fn, npy_fn in funcs:
self._generic_test_binary(fn, npy_fn)
def test_atan2(self):
funcs = [(math.atan2, np.arctan2)]
for fn, npy_fn in funcs:
self._generic_test_binary(fn, npy_fn)
def test_erf(self):
funcs = [math.erf, math.erfc]
for fn in funcs:
self._generic_test_unary(fn, np.vectorize(fn))
def test_gamma(self):
funcs = [math.gamma, math.lgamma]
for fn in funcs:
self._generic_test_unary(fn, np.vectorize(fn), span=(1e-4, 4.0))
if __name__ == '__main__':
unittest.main()
from timeit import default_timer as timer
import numpy as np
from numba import roc, float32
from numba.roc.hsadrv.error import HsaKernelLaunchError
import unittest
class TestMatMul(unittest.TestCase):
def test_matmul_naive(self):
@roc.jit
def matmul(A, B, C):
i = roc.get_global_id(0)
j = roc.get_global_id(1)
if i >= C.shape[0] or j >= C.shape[1]:
return
tmp = 0
for k in range(A.shape[1]):
tmp += A[i, k] * B[k, j]
C[i, j] = tmp
N = 256
A = np.random.random((N, N)).astype(np.float32)
B = np.random.random((N, N)).astype(np.float32)
C = np.zeros_like(A)
with roc.register(A, B, C):
ts = timer()
matmul[(N // 16, N // 16), (16, 16)](A, B, C)
te = timer()
print("1st GPU time:", te - ts)
with roc.register(A, B, C):
ts = timer()
matmul[(N // 16, N // 16), (16, 16)](A, B, C)
te = timer()
print("2nd GPU time:", te - ts)
ts = timer()
ans = np.dot(A, B)
te = timer()
print("CPU time:", te - ts)
np.testing.assert_allclose(ans, C, rtol=1e-5)
def check_matmul_fast(self, gridsize, blocksize):
@roc.jit
def matmulfast(A, B, C):
x = roc.get_global_id(0)
y = roc.get_global_id(1)
tx = roc.get_local_id(0)
ty = roc.get_local_id(1)
sA = roc.shared.array(shape=(blocksize, blocksize), dtype=float32)
sB = roc.shared.array(shape=(blocksize, blocksize), dtype=float32)
if x >= C.shape[0] or y >= C.shape[1]:
return
tmp = 0
for i in range(gridsize):
# preload
sA[tx, ty] = A[x, ty + i * blocksize]
sB[tx, ty] = B[tx + i * blocksize, y]
# wait for preload to end
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
# compute loop
for j in range(blocksize):
tmp += sA[tx, j] * sB[j, ty]
# wait for compute to end
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
C[x, y] = tmp
N = gridsize * blocksize
A = np.random.random((N, N)).astype(np.float32)
B = np.random.random((N, N)).astype(np.float32)
C = np.zeros_like(A)
griddim = gridsize, gridsize
blockdim = blocksize, blocksize
with roc.register(A, B, C):
ts = timer()
matmulfast[griddim, blockdim](A, B, C)
te = timer()
print("1st GPU time:", te - ts)
with roc.register(A, B, C):
ts = timer()
matmulfast[griddim, blockdim](A, B, C)
te = timer()
print("2nd GPU time:", te - ts)
ts = timer()
ans = np.dot(A, B)
te = timer()
print("CPU time:", te - ts)
np.testing.assert_allclose(ans, C, rtol=1e-5)
def test_matmul_fast(self):
self.check_matmul_fast(gridsize=8, blocksize=8)
def test_matmul_fast_insufficient_resources(self):
with self.assertRaises(HsaKernelLaunchError):
self.check_matmul_fast(gridsize=8, blocksize=20)
if __name__ == '__main__':
unittest.main()
"""
Test arrays backed by different memory
"""
import logging
import numpy as np
from numba import roc
import unittest
from numba.roc.hsadrv.driver import dgpu_present
logger = logging.getLogger()
@roc.jit
def copy_kernel(dst, src):
i = roc.get_global_id(0)
if i < dst.size:
dst[i] = src[i]
@unittest.skipUnless(dgpu_present, 'test only on dGPU system')
class TestMemory(unittest.TestCase):
def test_auto_device(self):
blkct = 4
blksz = 128
nelem = blkct * blksz
expect = np.arange(nelem) + 1
got = np.zeros_like(expect)
copy_kernel[blkct, blksz](got, expect.copy())
np.testing.assert_equal(got, expect)
def test_device_array(self):
blkct = 4
blksz = 128
nelem = blkct * blksz
expect = np.arange(nelem) + 1
logger.info('device array like')
darr = roc.device_array_like(expect)
logger.info('pre launch')
copy_kernel[blkct, blksz](darr, roc.to_device(expect))
logger.info('post launch')
got = darr.copy_to_host()
np.testing.assert_equal(got, expect)
def test_coarsegrain_array(self):
blkct = 4
blksz = 128
nelem = blkct * blksz
expect = np.arange(nelem) + 1
logger.info('coarsegrain array')
got = roc.coarsegrain_array(shape=expect.shape, dtype=expect.dtype)
got.fill(0)
logger.info('pre launch')
copy_kernel[blkct, blksz](got, expect.copy())
logger.info('post launch')
np.testing.assert_equal(got, expect)
def test_finegrain_array(self):
blkct = 4
blksz = 128
nelem = blkct * blksz
expect = np.arange(nelem) + 1
logger.info('finegrain array')
got = roc.finegrain_array(shape=expect.shape, dtype=expect.dtype)
got.fill(0)
logger.info('pre launch')
copy_kernel[blkct, blksz](got, expect.copy())
logger.info('post launch')
np.testing.assert_equal(got, expect)
@unittest.skipUnless(dgpu_present, 'test only on dGPU system')
class TestDeviceMemorye(unittest.TestCase):
def test_device_device_transfer(self):
# This has to be run in isolation and before the above
# TODO: investigate why?!
nelem = 1000
expect = np.arange(nelem, dtype=np.int32) + 1
logger.info('device array like')
darr = roc.device_array_like(expect)
self.assertTrue(np.all(expect != darr.copy_to_host()))
logger.info('to_device')
stage = roc.to_device(expect)
logger.info('device -> device')
darr.copy_to_device(stage)
np.testing.assert_equal(expect, darr.copy_to_host())
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
unittest.main()
from numba.roc.gcn_occupancy import get_limiting_factors
import unittest
class TestOccupancy(unittest.TestCase):
def check_limits(self, inputs, expected_outputs):
outputs = get_limiting_factors(**inputs)
for k, expect in expected_outputs.items():
got = getattr(outputs, k)
if k == 'occupancy':
self.assertAlmostEqual(got, expect, msg=k)
else:
self.assertEqual(got, expect, k)
def test_limits_1(self):
inputs = dict(group_size=400,
vgpr_per_workitem=139,
sgpr_per_wave=49)
outputs = dict(
allowed_wave_due_to_sgpr=10,
allowed_wave_due_to_vgpr=1,
allowed_wave=1,
allowed_vgpr_per_workitem=128,
occupancy=0,
reasons=set(['allowed_wave_due_to_vgpr',
'allowed_wave',
'group_size']),
)
self.check_limits(inputs, outputs)
def test_limits_2(self):
inputs = dict(group_size=256,
vgpr_per_workitem=139,
sgpr_per_wave=49)
outputs = dict(
allowed_wave_due_to_sgpr=10,
allowed_wave_due_to_vgpr=1,
allowed_wave=1,
allowed_vgpr_per_workitem=256,
occupancy=.10,
reasons=set(),
)
self.check_limits(inputs, outputs)
def test_limits_3(self):
inputs = dict(group_size=2048,
vgpr_per_workitem=16,
sgpr_per_wave=70)
outputs = dict(
allowed_wave_due_to_sgpr=7,
allowed_wave_due_to_vgpr=16,
allowed_wave=7,
allowed_vgpr_per_workitem=32,
occupancy=0,
reasons=set(['allowed_wave_due_to_sgpr',
'allowed_wave',
'group_size']),
)
self.check_limits(inputs, outputs)
def test_limits_4(self):
inputs = dict(group_size=2048,
vgpr_per_workitem=32,
sgpr_per_wave=50)
outputs = dict(
allowed_wave_due_to_sgpr=10,
allowed_wave_due_to_vgpr=8,
allowed_wave=8,
allowed_vgpr_per_workitem=32,
occupancy=0,
reasons=set(['group_size']),
)
self.check_limits(inputs, outputs)
def test_limits_5(self):
inputs = dict(group_size=4,
vgpr_per_workitem=128,
sgpr_per_wave=10)
outputs = dict(
allowed_wave_due_to_sgpr=51,
allowed_wave_due_to_vgpr=2,
allowed_wave=2,
allowed_vgpr_per_workitem=256,
occupancy=.1,
reasons=set(),
)
self.check_limits(inputs, outputs)
def test_limits_6(self):
inputs = dict(group_size=4,
vgpr_per_workitem=257,
sgpr_per_wave=3)
outputs = dict(
allowed_wave_due_to_sgpr=170,
allowed_wave_due_to_vgpr=0,
allowed_wave=0,
allowed_vgpr_per_workitem=256,
occupancy=0,
reasons=set(['allowed_wave_due_to_vgpr',
'allowed_wave']),
)
self.check_limits(inputs, outputs)
if __name__ == '__main__':
unittest.main()
import numpy as np
from numba import roc
import unittest
class TestPositioning(unittest.TestCase):
def test_kernel_jit(self):
@roc.jit
def udt(output):
global_id = roc.get_global_id(0)
global_size = roc.get_global_size(0)
local_id = roc.get_local_id(0)
group_id = roc.get_group_id(0)
num_groups = roc.get_num_groups(0)
workdim = roc.get_work_dim()
local_size = roc.get_local_size(0)
output[0, group_id, local_id] = global_id
output[1, group_id, local_id] = global_size
output[2, group_id, local_id] = local_id
output[3, group_id, local_id] = local_size
output[4, group_id, local_id] = group_id
output[5, group_id, local_id] = num_groups
output[6, group_id, local_id] = workdim
out = np.zeros((7, 2, 3), dtype=np.intp)
udt[2, 3](out)
np.testing.assert_equal([[0, 1, 2], [3, 4, 5]], out[0])
np.testing.assert_equal(6, out[1])
np.testing.assert_equal([[0, 1, 2]] * 2, out[2])
np.testing.assert_equal(3, out[3])
np.testing.assert_equal([[0, 0, 0], [1, 1, 1]], out[4])
np.testing.assert_equal(2, out[5])
np.testing.assert_equal(1, out[6])
if __name__ == '__main__':
unittest.main()
import numpy as np
from numba import roc, intp
import unittest
WAVESIZE = 64
@roc.jit(device=True)
def wave_reduce(val):
tid = roc.get_local_id(0)
laneid = tid % WAVESIZE
width = WAVESIZE // 2
while width:
if laneid < width:
val[laneid] += val[laneid + width]
val[laneid + width] = -1 # debug
roc.wavebarrier()
width = width // 2
# First thread has the result
roc.wavebarrier()
return val[0]
@roc.jit
def kernel_warp_reduce(inp, out):
idx = roc.get_group_id(0)
val = inp[idx]
out[idx] = wave_reduce(val)
@roc.jit
def kernel_flat_reduce(inp, out):
out[0] = wave_reduce(inp)
class TestReduction(unittest.TestCase):
def template_wave_reduce_int(self, dtype):
numblk = 2
inp = np.arange(numblk * WAVESIZE, dtype=dtype).reshape(numblk, WAVESIZE)
inp_cpy = np.copy(inp)
out = np.zeros((numblk,))
kernel_warp_reduce[numblk, WAVESIZE](inp, out)
np.testing.assert_equal(out, inp_cpy.sum(axis=1))
def test_wave_reduce_intp(self):
self.template_wave_reduce_int(np.intp)
def test_wave_reduce_int32(self):
self.template_wave_reduce_int(np.int32)
def template_wave_reduce_real(self, dtype):
numblk = 2
inp = np.linspace(0, 1, numblk * WAVESIZE).astype(dtype)
inp = inp.reshape(numblk, WAVESIZE)
inp_cpy = np.copy(inp)
out = np.zeros((numblk,))
kernel_warp_reduce[numblk, WAVESIZE](inp, out)
np.testing.assert_allclose(out, inp_cpy.sum(axis=1))
def test_wave_reduce_float64(self):
self.template_wave_reduce_real(np.float64)
def test_wave_reduce_float32(self):
self.template_wave_reduce_real(np.float32)
def test_flat_reduce(self):
inp = np.arange(WAVESIZE) # destroyed in kernel
out = np.zeros((1,))
kernel_flat_reduce[1, WAVESIZE](inp, out)
np.testing.assert_allclose(out[0], np.arange(WAVESIZE).sum())
if __name__ == '__main__':
unittest.main()
import numpy as np
from numba import roc, intp, int32
import unittest
@roc.jit(device=True)
def device_scan_generic(tid, data):
"""Inclusive prefix sum within a single block
Requires tid should have range [0, data.size) and data.size must be
power of 2.
"""
n = data.size
# Upsweep
offset = 1
d = n // 2
while d > 0:
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
if tid < d:
ai = offset * (2 * tid + 1) - 1
bi = offset * (2 * tid + 2) - 1
data[bi] += data[ai]
offset *= 2
d //= 2
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
prefixsum = data[n - 1]
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
if tid == 0:
data[n - 1] = 0
# Downsweep
d = 1
offset = n
while d < n:
offset //= 2
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
if tid < d:
ai = offset * (2 * tid + 1) - 1
bi = offset * (2 * tid + 2) - 1
tmp = data[ai]
data[ai] = data[bi]
data[bi] += tmp
d *= 2
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
return prefixsum
_WARPSIZE = 64
@roc.jit(device=True)
def warp_scan(tid, temp, inclusive):
"""Intra-warp scan
Note
----
Assume all threads are in lockstep
"""
roc.wavebarrier()
lane = tid & (_WARPSIZE - 1)
if lane >= 1:
temp[tid] += temp[tid - 1]
roc.wavebarrier()
if lane >= 2:
temp[tid] += temp[tid - 2]
roc.wavebarrier()
if lane >= 4:
temp[tid] += temp[tid - 4]
roc.wavebarrier()
if lane >= 8:
temp[tid] += temp[tid - 8]
roc.wavebarrier()
if lane >= 16:
temp[tid] += temp[tid - 16]
roc.wavebarrier()
if lane >= 32:
temp[tid] += temp[tid - 32]
roc.wavebarrier()
if inclusive:
return temp[tid]
else:
return temp[tid - 1] if lane > 0 else 0
@roc.jit(device=True)
def device_scan(tid, data, temp, inclusive):
"""
Args
----
tid:
thread id
data: scalar
input for tid
temp: shared memory for temporary work
"""
lane = tid & (_WARPSIZE - 1)
warpid = tid >> 6
# Preload
temp[tid] = data
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
# Scan warps in parallel
warp_scan_res = warp_scan(tid, temp, inclusive)
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
# Get partial result
if lane == (_WARPSIZE - 1):
temp[warpid] = temp[tid]
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
# Scan the partial results
if warpid == 0:
warp_scan(tid, temp, True)
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
# Accumulate scanned partial results
if warpid > 0:
warp_scan_res += temp[warpid - 1]
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
# Output
if tid == temp.size - 1:
# Last thread computes prefix sum
if inclusive:
temp[0] = warp_scan_res
else:
temp[0] = warp_scan_res + data
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
# Load prefixsum
prefixsum = temp[0]
roc.barrier(roc.CLK_GLOBAL_MEM_FENCE)
return warp_scan_res, prefixsum
@roc.jit(device=True)
def shuffle_up(val, width):
tid = roc.get_local_id(0)
roc.wavebarrier()
idx = (tid + width) % _WARPSIZE
res = roc.ds_permute(idx, val)
return res
def make_inclusive_scan(dtype):
@roc.jit(device=True)
def shuf_wave_inclusive_scan(val):
tid = roc.get_local_id(0)
lane = tid & (_WARPSIZE - 1)
roc.wavebarrier()
shuf = shuffle_up(val, 1)
if lane >= 1:
val = dtype(val + shuf)
roc.wavebarrier()
shuf = shuffle_up(val, 2)
if lane >= 2:
val = dtype(val + shuf)
roc.wavebarrier()
shuf = shuffle_up(val, 4)
if lane >= 4:
val = dtype(val + shuf)
roc.wavebarrier()
shuf = shuffle_up(val, 8)
if lane >= 8:
val = dtype(val + shuf)
roc.wavebarrier()
shuf = shuffle_up(val, 16)
if lane >= 16:
val = dtype(val + shuf)
roc.wavebarrier()
shuf = shuffle_up(val, 32)
if lane >= 32:
val = dtype(val + shuf)
roc.wavebarrier()
return val
return shuf_wave_inclusive_scan
shuf_wave_inclusive_scan_int32 = make_inclusive_scan(int32)
@roc.jit(device=True)
def shuf_device_inclusive_scan(data, temp):
"""
Args
----
data: scalar
input for tid
temp: shared memory for temporary work, requires at least
threadcount/wavesize storage
"""
tid = roc.get_local_id(0)
lane = tid & (_WARPSIZE - 1)
warpid = tid >> 6
# Scan warps in parallel
warp_scan_res = shuf_wave_inclusive_scan_int32(data)
roc.barrier()
# Store partial sum into shared memory
if lane == (_WARPSIZE - 1):
temp[warpid] = warp_scan_res
roc.barrier()
# Scan the partial sum by first wave
if warpid == 0:
shuf_wave_inclusive_scan_int32(temp[lane])
roc.barrier()
# Get block sum for each wave
blocksum = 0 # first wave is 0
if warpid > 0:
blocksum = temp[warpid - 1]
return warp_scan_res + blocksum
class TestScan(unittest.TestCase):
def test_single_block(self):
@roc.jit
def scan_block(data, sums):
sm_data = roc.shared.array(64, dtype=intp)
tid = roc.get_local_id(0)
gid = roc.get_global_id(0)
blkid = roc.get_group_id(0)
sm_data[tid] = data[gid]
prefixsum = device_scan_generic(tid, sm_data)
data[gid] = sm_data[tid]
if tid == 0:
sums[blkid] = prefixsum
data = np.random.randint(0, 4, size=64).astype(np.intp)
expected = data.cumsum()
sums = np.zeros(1, dtype=np.intp)
scan_block[1, 64](data, sums)
np.testing.assert_equal(expected[:-1], data[1:])
self.assertEqual(expected[-1], sums[0])
self.assertEqual(0, data[0])
def test_multi_block(self):
@roc.jit
def scan_block(data, sums):
sm_data = roc.shared.array(64, dtype=intp)
tid = roc.get_local_id(0)
gid = roc.get_global_id(0)
blkid = roc.get_group_id(0)
sm_data[tid] = data[gid]
prefixsum = device_scan_generic(tid, sm_data)
data[gid] = sm_data[tid]
if tid == 0:
sums[blkid] = prefixsum
nd_data = np.random.randint(0, 4, size=3 * 64).astype(
np.intp).reshape(3, 64)
nd_expected = nd_data.cumsum(axis=1)
sums = np.zeros(3, dtype=np.intp)
scan_block[3, 64](nd_data.ravel(), sums)
for nd in range(nd_expected.shape[0]):
expected = nd_expected[nd]
data = nd_data[nd]
np.testing.assert_equal(expected[:-1], data[1:])
self.assertEqual(expected[-1], sums[nd])
self.assertEqual(0, data[0])
def test_multi_large_block(self):
@roc.jit
def scan_block(data, sums):
sm_data = roc.shared.array(128, dtype=intp)
tid = roc.get_local_id(0)
gid = roc.get_global_id(0)
blkid = roc.get_group_id(0)
sm_data[tid] = data[gid]
prefixsum = device_scan_generic(tid, sm_data)
data[gid] = sm_data[tid]
sums[blkid, tid] = prefixsum
nd_data = np.random.randint(0, 4, size=3 * 128).astype(
np.intp).reshape(3, 128)
nd_expected = nd_data.cumsum(axis=1)
sums = np.zeros((3, 128), dtype=np.intp)
scan_block[3, 128](nd_data.ravel(), sums)
for nd in range(nd_expected.shape[0]):
expected = nd_expected[nd]
data = nd_data[nd]
np.testing.assert_equal(expected[:-1], data[1:])
np.testing.assert_equal(expected[-1], sums[nd])
self.assertEqual(0, data[0])
class TestFasterScan(unittest.TestCase):
def test_single_block(self):
@roc.jit
def scan_block(data, sums):
sm_data = roc.shared.array(64, dtype=intp)
tid = roc.get_local_id(0)
gid = roc.get_global_id(0)
blkid = roc.get_group_id(0)
scanval, prefixsum = device_scan(tid, data[gid], sm_data,
False)
data[gid] = scanval
if tid == 0:
sums[blkid] = prefixsum
data = np.random.randint(0, 4, size=64).astype(np.intp)
expected = data.cumsum()
sums = np.zeros(1, dtype=np.intp)
scan_block[1, 64](data, sums)
np.testing.assert_equal(expected[:-1], data[1:])
self.assertEqual(expected[-1], sums[0])
self.assertEqual(0, data[0])
def test_single_larger_block(self):
@roc.jit
def scan_block(data, sums):
sm_data = roc.shared.array(256, dtype=intp)
tid = roc.get_local_id(0)
gid = roc.get_global_id(0)
blkid = roc.get_group_id(0)
scanval, prefixsum = device_scan(tid, data[gid], sm_data,
False)
data[gid] = scanval
if tid == 0:
sums[blkid] = prefixsum
data = np.random.randint(0, 4, size=256).astype(np.intp)
expected = data.cumsum()
sums = np.zeros(1, dtype=np.intp)
scan_block[1, 256](data, sums)
np.testing.assert_equal(expected[:-1], data[1:])
print(data)
print(sums)
self.assertEqual(expected[-1], sums[0])
self.assertEqual(0, data[0])
def test_multi_large_block(self):
@roc.jit
def scan_block(data, sums):
sm_data = roc.shared.array(128, dtype=intp)
tid = roc.get_local_id(0)
gid = roc.get_global_id(0)
blkid = roc.get_group_id(0)
scanval, prefixsum = device_scan(tid, data[gid], sm_data,
False)
data[gid] = scanval
sums[blkid, tid] = prefixsum
nd_data = np.random.randint(0, 4, size=3 * 128).astype(
np.intp).reshape(3, 128)
nd_expected = nd_data.cumsum(axis=1)
sums = np.zeros((3, 128), dtype=np.intp)
scan_block[3, 128](nd_data.ravel(), sums)
for nd in range(nd_expected.shape[0]):
expected = nd_expected[nd]
data = nd_data[nd]
np.testing.assert_equal(expected[:-1], data[1:])
np.testing.assert_equal(expected[-1], sums[nd])
self.assertEqual(0, data[0])
class TestShuffleScan(unittest.TestCase):
def test_shuffle_ds_permute(self):
@roc.jit
def foo(inp, mask, out):
tid = roc.get_local_id(0)
out[tid] = roc.ds_permute(inp[tid], mask[tid])
inp = np.arange(64, dtype=np.int32)
np.random.seed(0)
for i in range(10):
mask = np.random.randint(0, inp.size, inp.size).astype(np.int32)
out = np.zeros_like(inp)
foo[1, 64](inp, mask, out)
np.testing.assert_equal(inp[mask], out)
def test_shuffle_up(self):
@roc.jit
def foo(inp, out):
gid = roc.get_global_id(0)
out[gid] = shuffle_up(inp[gid], 1)
inp = np.arange(128, dtype=np.int32)
out = np.zeros_like(inp)
foo[1, 128](inp, out)
inp = inp.reshape(2, 64)
out = out.reshape(inp.shape)
for i in range(out.shape[0]):
np.testing.assert_equal(inp[0, :-1], out[0, 1:])
np.testing.assert_equal(inp[0, -1], out[0, 0])
def test_shuf_wave_inclusive_scan(self):
@roc.jit
def foo(inp, out):
gid = roc.get_global_id(0)
out[gid] = shuf_wave_inclusive_scan_int32(inp[gid])
inp = np.arange(64, dtype=np.int32)
out = np.zeros_like(inp)
foo[1, 64](inp, out)
np.testing.assert_equal(inp.cumsum(), out)
def test_shuf_device_inclusive_scan(self):
@roc.jit
def foo(inp, out):
gid = roc.get_global_id(0)
temp = roc.shared.array(2, dtype=int32)
out[gid] = shuf_device_inclusive_scan(inp[gid], temp)
inp = np.arange(128, dtype=np.int32)
out = np.zeros_like(inp)
foo[1, inp.size](inp, out)
np.testing.assert_equal(np.cumsum(inp), out)
if __name__ == '__main__':
unittest.main()
import numpy as np
from numba import roc
from numba.roc.hsadrv.error import HsaKernelLaunchError
import unittest
class TestSimple(unittest.TestCase):
def test_array_access(self):
magic_token = 123
@roc.jit
def udt(output):
output[0] = magic_token
out = np.zeros(1, dtype=np.intp)
udt[1, 1](out)
self.assertEqual(out[0], magic_token)
def test_array_access_2d(self):
magic_token = 123
@roc.jit
def udt(output):
for i in range(output.shape[0]):
for j in range(output.shape[1]):
output[i, j] = magic_token
out = np.zeros((10, 10), dtype=np.intp)
udt[1, 1](out)
np.testing.assert_equal(out, magic_token)
def test_array_access_3d(self):
magic_token = 123
@roc.jit
def udt(output):
for i in range(output.shape[0]):
for j in range(output.shape[1]):
for k in range(output.shape[2]):
output[i, j, k] = magic_token
out = np.zeros((10, 10, 10), dtype=np.intp)
udt[1, 1](out)
np.testing.assert_equal(out, magic_token)
def test_global_id(self):
@roc.jit
def udt(output):
global_id = roc.get_global_id(0)
output[global_id] = global_id
# Allocate extra space to track bad indexing
out = np.zeros(100 + 2, dtype=np.intp)
udt[10, 10](out[1:-1])
np.testing.assert_equal(out[1:-1], np.arange(100))
self.assertEqual(out[0], 0)
self.assertEqual(out[-1], 0)
def test_local_id(self):
@roc.jit
def udt(output):
global_id = roc.get_global_id(0)
local_id = roc.get_local_id(0)
output[global_id] = local_id
# Allocate extra space to track bad indexing
out = np.zeros(100 + 2, dtype=np.intp)
udt[10, 10](out[1:-1])
subarr = out[1:-1]
for parted in np.split(subarr, 10):
np.testing.assert_equal(parted, np.arange(10))
self.assertEqual(out[0], 0)
self.assertEqual(out[-1], 0)
def test_group_id(self):
@roc.jit
def udt(output):
global_id = roc.get_global_id(0)
group_id = roc.get_group_id(0)
output[global_id] = group_id + 1
# Allocate extra space to track bad indexing
out = np.zeros(100 + 2, dtype=np.intp)
udt[10, 10](out[1:-1])
subarr = out[1:-1]
for i, parted in enumerate(np.split(subarr, 10), start=1):
np.testing.assert_equal(parted, i)
self.assertEqual(out[0], 0)
self.assertEqual(out[-1], 0)
def test_workdim(self):
@roc.jit
def udt(output):
global_id = roc.get_global_id(0)
workdim = roc.get_work_dim()
output[global_id] = workdim
out = np.zeros(10, dtype=np.intp)
udt[1, 10](out)
np.testing.assert_equal(out, 1)
@roc.jit
def udt2(output):
g0 = roc.get_global_id(0)
g1 = roc.get_global_id(1)
output[g0, g1] = roc.get_work_dim()
out = np.zeros((2, 5), dtype=np.intp)
udt2[(1, 1), (2, 5)](out)
np.testing.assert_equal(out, 2)
def test_empty_kernel(self):
@roc.jit
def udt():
pass
udt[1, 1]()
def test_workgroup_oversize(self):
@roc.jit
def udt():
pass
with self.assertRaises(HsaKernelLaunchError) as raises:
udt[1, 2**30]()
self.assertIn("Try reducing group-size", str(raises.exception))
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment