import torch
import ctypes
from ctypes import POINTER, Structure, c_int32, c_size_t, c_uint64, c_void_p, c_float
from libinfiniop import (
infiniopHandle_t,
infiniopTensorDescriptor_t,
open_lib,
to_tensor,
get_test_devices,
check_error,
rearrange_if_needed,
create_workspace,
test_operator,
get_args,
debug,
get_tolerance,
profile_operation,
)
# ==============================================================================
# Configuration (Internal Use Only)
# ==============================================================================
# These are not meant to be imported from other modules
_TEST_CASES = [
# alpha, beta, a_shape, b_shape, c_shape, a_stride, b_stride, c_stride
(1.0, 0.0, (1, 2048), (2048, 2048), (1, 2048), None, None, None),
(1.0, 0.0, (1, 2048), (2048, 2048), (1, 2048), None, None, None),
(1.0, 0.0, (2, 4, 2048), (2, 2048, 2048), (2, 4, 2048), None, None, None),
(1.0, 0.0, (2, 4, 2048), (2, 2048, 2048), (2, 4, 2048), None, None, None),
(1.0, 0.0, (1, 2048), (2048, 2048), (1, 2048), (4096, 1), (4096, 1), (4096, 1)),
(1.0, 0.0, (1, 2048), (2048, 2048), (1, 2048), (4096, 1), (4096, 1), (4096, 1)),
(1.0, 1.0, (6, 2048), (2048, 2560), (6, 2560), (2048, 1), (1, 2048), (2560, 1)),
(1.0, 1.0, (6, 2048), (2048, 2560), (6, 2560), (2048, 1), (1, 2048), (2560, 1)),
(1.0 / 8.0, 0.0, (4, 8 * 6, 64), (4, 64, 6), (4, 8 * 6, 6), None, None, None),
(1.0 / 8.0, 0.0, (4, 8 * 6, 64), (4, 64, 6), (4, 8 * 6, 6), None, None, None),
]
# Data types used for testing
_TENSOR_DTYPES = [torch.float16, torch.float32]
# Tolerance map for different data types
_TOLERANCE_MAP = {
torch.float16: {"atol": 0, "rtol": 1e-2},
torch.float32: {"atol": 0, "rtol": 1e-3},
}
DEBUG = False
PROFILE = False
NUM_PRERUN = 10
NUM_ITERATIONS = 1000
# ==============================================================================
# Definitions
# ==============================================================================
class MatmulDescriptor(Structure):
_fields_ = [("device", c_int32)]
infiniopMatmulDescriptor_t = POINTER(MatmulDescriptor)
# PyTorch implementation for matrix multiplication
def matmul(_c, beta, _a, _b, alpha):
a, b, c = _a.clone(), _b.clone(), _c.clone()
result_dtype = c.dtype
fp32_result = torch.matmul(a.to(torch.float32), b.to(torch.float32))
return alpha * fp32_result.to(result_dtype) + beta * c
# The argument list should be (lib, handle, torch_device, , dtype)
# The should keep the same order as the one specified in _TEST_CASES
def test(
lib,
handle,
torch_device,
alpha,
beta,
a_shape,
b_shape,
c_shape,
a_stride=None,
b_stride=None,
c_stride=None,
dtype=torch.float16,
):
print(
f"Testing Matmul on {torch_device} with alpha:{alpha}, beta:{beta},"
f" a_shape:{a_shape}, b_shape:{b_shape}, c_shape:{c_shape},"
f" a_stride:{a_stride}, b_stride:{b_stride}, c_stride:{c_stride}, dtype:{dtype}"
)
# Initialize tensors
a = torch.rand(a_shape, dtype=dtype).to(torch_device)
b = torch.rand(b_shape, dtype=dtype).to(torch_device)
c = torch.ones(c_shape, dtype=dtype).to(torch_device)
# Compute the PyTorch reference result
ans = matmul(c, beta, a, b, alpha)
a, b, c = [
rearrange_if_needed(tensor, stride)
for tensor, stride in zip([a, b, c], [a_stride, b_stride, c_stride])
]
a_tensor, b_tensor, c_tensor = [to_tensor(tensor, lib) for tensor in [a, b, c]]
descriptor = infiniopMatmulDescriptor_t()
check_error(
lib.infiniopCreateMatmulDescriptor(
handle,
ctypes.byref(descriptor),
c_tensor.descriptor,
a_tensor.descriptor,
b_tensor.descriptor,
)
)
# Invalidate the shape and strides in the descriptor to prevent them from being directly used by the kernel
for tensor in [a_tensor, b_tensor, c_tensor]:
tensor.destroyDesc(lib)
# Get workspace size and create workspace
workspace_size = c_uint64(0)
check_error(
lib.infiniopGetMatmulWorkspaceSize(descriptor, ctypes.byref(workspace_size))
)
workspace = create_workspace(workspace_size.value, a.device)
# Execute infiniop matmul operator
def lib_matmul():
check_error(
lib.infiniopMatmul(
descriptor,
workspace.data_ptr() if workspace is not None else None,
workspace_size.value,
c_tensor.data,
a_tensor.data,
b_tensor.data,
alpha,
beta,
None,
)
)
lib_matmul()
# Validate results
atol, rtol = get_tolerance(_TOLERANCE_MAP, dtype)
if DEBUG:
debug(c, ans, atol=atol, rtol=rtol)
assert torch.allclose(c, ans, atol=atol, rtol=rtol)
# Profiling workflow
if PROFILE:
# fmt: off
profile_operation("PyTorch", lambda: matmul(c, beta, a, b, alpha), torch_device, NUM_PRERUN, NUM_ITERATIONS)
profile_operation(" lib", lambda: lib_matmul(), torch_device, NUM_PRERUN, NUM_ITERATIONS)
# fmt: on
check_error(lib.infiniopDestroyMatmulDescriptor(descriptor))
# ==============================================================================
# Main Execution
# ==============================================================================
if __name__ == "__main__":
args = get_args()
lib = open_lib()
lib.infiniopCreateMatmulDescriptor.restype = c_int32
lib.infiniopCreateMatmulDescriptor.argtypes = [
infiniopHandle_t,
POINTER(infiniopMatmulDescriptor_t),
infiniopTensorDescriptor_t,
infiniopTensorDescriptor_t,
infiniopTensorDescriptor_t,
]
lib.infiniopGetMatmulWorkspaceSize.restype = c_int32
lib.infiniopGetMatmulWorkspaceSize.argtypes = [
infiniopMatmulDescriptor_t,
POINTER(c_size_t),
]
lib.infiniopMatmul.restype = c_int32
lib.infiniopMatmul.argtypes = [
infiniopMatmulDescriptor_t,
c_void_p,
c_uint64,
c_void_p,
c_void_p,
c_void_p,
c_float,
c_float,
c_void_p,
]
lib.infiniopDestroyMatmulDescriptor.restype = c_int32
lib.infiniopDestroyMatmulDescriptor.argtypes = [
infiniopMatmulDescriptor_t,
]
# Configure testing options
DEBUG = args.debug
PROFILE = args.profile
NUM_PRERUN = args.num_prerun
NUM_ITERATIONS = args.num_iterations
# Execute tests
for device in get_test_devices(args):
test_operator(lib, device, test, _TEST_CASES, _TENSOR_DTYPES)
print("\033[92mTest passed!\033[0m")