Commit 9b32b4b1 authored by Catheriany's avatar Catheriany
Browse files

Merge remote-tracking branch 'origin/main' into issue/150

parents 15bcbdfc 4799ddbf
from ast import List
import numpy as np
import gguf
from typing import List
from numpy.lib.stride_tricks import as_strided
from .. import InfiniopTestWriter, InfiniopTestCase, np_dtype_to_ggml, gguf_strides, contiguous_gguf_strides
def add(
a: np.ndarray,
b: np.ndarray,
):
return a + b
def process_tensor(a, b, stride_a=None, stride_b=None):
def normalize_stride(tensor, stride):
if stride:
slices = tuple(slice(0, 1) if s == 0 else slice(None) for s in stride)
return tensor[slices]
else:
return tensor
a_unique = normalize_stride(a, stride_a)
b_unique = normalize_stride(b, stride_b)
return a_unique, b_unique
class AddTestCase(InfiniopTestCase):
def __init__(
self,
a: np.ndarray,
shape_a: List[int] | None,
stride_a: List[int] | None,
b: np.ndarray,
shape_b: List[int] | None,
stride_b: List[int] | None,
c: np.ndarray,
shape_c: List[int] | None,
stride_c: List[int] | None,
):
super().__init__("add")
self.a = a
self.shape_a = shape_a
self.stride_a = stride_a
self.b = b
self.shape_b = shape_b
self.stride_b = stride_b
self.c = c
self.shape_c = shape_c
self.stride_c = stride_c
def write_test(self, test_writer: "InfiniopTestWriter"):
super().write_test(test_writer)
if self.shape_a is not None:
test_writer.add_array(test_writer.gguf_key("a.shape"), self.shape_a)
if self.shape_b is not None:
test_writer.add_array(test_writer.gguf_key("b.shape"), self.shape_b)
if self.shape_c is not None:
test_writer.add_array(test_writer.gguf_key("c.shape"), self.shape_c)
if self.stride_a is not None:
test_writer.add_array(test_writer.gguf_key("a.strides"), gguf_strides(*self.stride_a))
if self.stride_b is not None:
test_writer.add_array(test_writer.gguf_key("b.strides"), gguf_strides(*self.stride_b))
test_writer.add_array(
test_writer.gguf_key("c.strides"),
gguf_strides(*self.stride_c if self.stride_c is not None else contiguous_gguf_strides(self.shape_c))
)
test_writer.add_tensor(
test_writer.gguf_key("a"), self.a, raw_dtype=np_dtype_to_ggml(self.a.dtype)
)
test_writer.add_tensor(
test_writer.gguf_key("b"), self.b, raw_dtype=np_dtype_to_ggml(self.b.dtype)
)
test_writer.add_tensor(
test_writer.gguf_key("c"), self.c, raw_dtype=np_dtype_to_ggml(self.c.dtype)
)
ans = add(
self.a.astype(np.float64),
self.b.astype(np.float64),
)
test_writer.add_tensor(
test_writer.gguf_key("ans"), ans, raw_dtype=gguf.GGMLQuantizationType.F64
)
if __name__ == "__main__":
test_writer = InfiniopTestWriter("add.gguf")
test_cases = []
# ==============================================================================
# Configuration (Internal Use Only)
# ==============================================================================
# These are not meant to be imported from other modules
_TEST_CASES_ = [
# shape, a_stride, b_stride, c_stride
((13, 4), None, None, None),
((13, 4), (10, 1), (10, 1), (10, 1)),
((13, 4), (0, 1), None, None),
((13, 4, 4), None, None, None),
((13, 4, 4), (20, 4, 1), (20, 4, 1), (20, 4, 1)),
((13, 4, 4), (4, 0, 1), (0, 4, 1), None),
((16, 5632), None, None, None),
((16, 5632), (13312, 1), (13312, 1), (13312, 1)),
((4, 4, 5632), None, None, None),
((4, 4, 5632), (45056, 5632, 1), (45056, 5632, 1), (45056, 5632, 1)),
]
_TENSOR_DTYPES_ = [np.float32, np.float16]
for dtype in _TENSOR_DTYPES_:
for shape, stride_a, stride_b, stride_c in _TEST_CASES_:
a = np.random.rand(*shape).astype(dtype)
b = np.random.rand(*shape).astype(dtype)
c = np.empty(tuple(0 for _ in shape), dtype=dtype)
a, b = process_tensor(a, b, stride_a, stride_b)
if stride_c is None:
stride_c = contiguous_gguf_strides(shape)
test_case = AddTestCase(
a=a,
shape_a=shape,
stride_a=stride_a,
b=b,
shape_b=shape,
stride_b=stride_b,
c=c,
shape_c=shape,
stride_c=stride_c,
)
test_cases.append(test_case)
test_writer.add_tests(test_cases)
test_writer.save()
\ No newline at end of file
import numpy as np
import gguf
from typing import List, Optional, Tuple
from .. import InfiniopTestWriter, InfiniopTestCase, np_dtype_to_ggml, gguf_strides
def clip(
x: np.ndarray,
min_val: np.ndarray,
max_val: np.ndarray,
) -> np.ndarray:
"""
Clip the values in input tensor x to the range [min_val, max_val].
Args:
x: Input tensor
min_val: Tensor with minimum values (same shape as x)
max_val: Tensor with maximum values (same shape as x)
Returns:
Clipped tensor with the same shape as x
"""
return np.maximum(np.minimum(x, max_val), min_val)
def random_tensor(shape, dtype):
"""
Generate a random tensor with values in the range [-2, 2].
Args:
shape: Shape of the tensor
dtype: Data type of the tensor
Returns:
Random tensor with the specified shape and dtype
"""
return (np.random.rand(*shape).astype(dtype) * 4.0 - 2.0)
class ClipTestCase(InfiniopTestCase):
"""
Test case for the Clip operator.
"""
def __init__(
self,
x: np.ndarray,
x_stride: Optional[List[int]],
min_val: np.ndarray,
min_stride: Optional[List[int]],
max_val: np.ndarray,
max_stride: Optional[List[int]],
y: np.ndarray,
y_stride: Optional[List[int]],
):
super().__init__("clip")
self.x = x
self.x_stride = x_stride
self.min_val = min_val
self.min_stride = min_stride
self.max_val = max_val
self.max_stride = max_stride
self.y = y
self.y_stride = y_stride
def write_test(self, test_writer: "InfiniopTestWriter"):
super().write_test(test_writer)
# Add strides as arrays if they exist
if self.x_stride is not None:
test_writer.add_array(test_writer.gguf_key("x.strides"), self.x_stride)
if self.min_stride is not None:
test_writer.add_array(test_writer.gguf_key("min_val.strides"), self.min_stride)
if self.max_stride is not None:
test_writer.add_array(test_writer.gguf_key("max_val.strides"), self.max_stride)
if self.y_stride is not None:
test_writer.add_array(test_writer.gguf_key("y.strides"), self.y_stride)
# Add tensors to the test
test_writer.add_tensor(
test_writer.gguf_key("x"),
self.x,
raw_dtype=np_dtype_to_ggml(self.x.dtype)
)
test_writer.add_tensor(
test_writer.gguf_key("min_val"),
self.min_val,
raw_dtype=np_dtype_to_ggml(self.min_val.dtype)
)
test_writer.add_tensor(
test_writer.gguf_key("max_val"),
self.max_val,
raw_dtype=np_dtype_to_ggml(self.max_val.dtype)
)
test_writer.add_tensor(
test_writer.gguf_key("y"),
self.y,
raw_dtype=np_dtype_to_ggml(self.y.dtype)
)
# Calculate the expected result
ans = clip(
self.x.astype(np.float64),
self.min_val.astype(np.float64),
self.max_val.astype(np.float64)
)
# Add the expected result to the test
test_writer.add_tensor(
test_writer.gguf_key("ans"),
ans,
raw_dtype=gguf.GGMLQuantizationType.F64
)
if __name__ == "__main__":
test_writer = InfiniopTestWriter("clip.gguf")
# Create test cases for different shapes, strides, and data types
test_cases = []
# Test case shapes
shapes = [
(10,), # 1D tensor
(5, 10), # 2D tensor
(2, 3, 4), # 3D tensor
(7, 13), # Prime dimensions
(1, 1), # Minimum shape
(100, 100), # Large shape
(16, 16, 16), # Large 3D
]
# Test case min/max values
min_max_values = [
(-1.0, 1.0), # Standard range
(0.0, 2.0), # Positive range
(-2.0, 0.0), # Negative range
(-1000.0, 1000.0), # Large range
(-0.001, 0.001), # Small range
(0.0, 0.0), # min=max
]
# Data types to test
dtypes = [np.float16, np.float32, np.float64]
# Generate test cases with contiguous tensors
for shape in shapes:
for min_val, max_val in min_max_values:
for dtype in dtypes:
x = random_tensor(shape, dtype)
min_tensor = np.full(shape, min_val, dtype=dtype)
max_tensor = np.full(shape, max_val, dtype=dtype)
y = np.zeros(shape, dtype=dtype)
test_cases.append(
ClipTestCase(
x=x,
x_stride=None,
min_val=min_tensor,
min_stride=None,
max_val=max_tensor,
max_stride=None,
y=y,
y_stride=None
)
)
# Generate test cases with strided tensors (for 2D shapes only)
for shape in [s for s in shapes if len(s) == 2]:
for dtype in dtypes:
# Row-major stride
row_stride = gguf_strides(shape[1], 1)
# Column-major stride
col_stride = gguf_strides(1, shape[0])
# Test case with row-major input and output
x = random_tensor(shape, dtype)
min_tensor = np.full(shape, -1.0, dtype=dtype)
max_tensor = np.full(shape, 1.0, dtype=dtype)
y = np.zeros(shape, dtype=dtype)
test_cases.append(
ClipTestCase(
x=x,
x_stride=row_stride,
min_val=min_tensor,
min_stride=row_stride,
max_val=max_tensor,
max_stride=row_stride,
y=y,
y_stride=row_stride
)
)
# Test case with column-major input and output
x = random_tensor(shape, dtype)
min_tensor = np.full(shape, -1.0, dtype=dtype)
max_tensor = np.full(shape, 1.0, dtype=dtype)
y = np.zeros(shape, dtype=dtype)
test_cases.append(
ClipTestCase(
x=x,
x_stride=col_stride,
min_val=min_tensor,
min_stride=col_stride,
max_val=max_tensor,
max_stride=col_stride,
y=y,
y_stride=col_stride
)
)
# Test case with different strides for input and output
x = random_tensor(shape, dtype)
min_tensor = np.full(shape, -1.0, dtype=dtype)
max_tensor = np.full(shape, 1.0, dtype=dtype)
y = np.zeros(shape, dtype=dtype)
test_cases.append(
ClipTestCase(
x=x,
x_stride=row_stride,
min_val=min_tensor,
min_stride=row_stride,
max_val=max_tensor,
max_stride=row_stride,
y=y,
y_stride=col_stride
)
)
# Add all test cases to the writer
test_writer.add_tests(test_cases)
# Save the test cases to a GGUF file
test_writer.save()
print(f"Generated {len(test_cases)} test cases for the Clip operator")
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment