Commit e307f686 authored by pengcheng888's avatar pengcheng888
Browse files

issue-606 添加nn.Linear,nn.RMSNorm,nn.Embedding类的python实现和测试

parent 74934cdf
from infinicore.lib import _infinicore
from infinicore.tensor import Tensor
__all__ = ["causal_softmax"]
def causal_softmax(input: Tensor, out=None) -> Tensor:
r"""Apply a causal softmax function."""
......
from infinicore.lib import _infinicore
from infinicore.tensor import Tensor
__all__ = ["embedding"]
def embedding(
input: Tensor,
......
from infinicore.lib import _infinicore
from infinicore.tensor import Tensor
__all__ = ["linear"]
def linear(input: Tensor, weight: Tensor, bias=None, *, out=None) -> Tensor:
r"""Applies a linear transformation to the incoming data: y=xA^T+b."""
......
from infinicore.lib import _infinicore
from infinicore.tensor import Tensor
__all__ = ["random_sample"]
def random_sample(
logits: Tensor,
......
from infinicore.lib import _infinicore
from infinicore.tensor import Tensor
__all__ = ["rms_norm"]
def rms_norm(
input: Tensor,
......
from infinicore.lib import _infinicore
from infinicore.tensor import Tensor
__all__ = ["rope", "RopeAlgo"]
class RopeAlgo:
r"""Different types of RoPE algorithms."""
......
......@@ -2,8 +2,6 @@ import infinicore
from infinicore.lib import _infinicore
from infinicore.tensor import Tensor
__all__ = ["silu"]
def silu(input: Tensor, inplace: bool = False, *, out=None) -> Tensor:
r"""Apply the Sigmoid Linear Unit (SiLU) function, element-wise."""
......
from infinicore.lib import _infinicore
from infinicore.tensor import Tensor
__all__ = ["swiglu"]
def swiglu(input: Tensor, other: Tensor, *, out=None):
r"""Apply the Swish-Gated Linear Unit (SwiGLU) function, element-wise."""
......
from .container import InfiniCoreModuleList as ModuleList
from .linear import Linear
from .module import InfiniCoreModule as Module
from .normalization import RMSNorm
from .sparse import Embedding
__all__ = ["ModuleList", "Module"]
__all__ = ["Linear", "RMSNorm", "Embedding", "ModuleList", "Module"]
import infinicore
from infinicore.nn import functional as F
from ...tensor import Tensor
from ..parameter import InfiniCoreParameter as Parameter
from .module import InfiniCoreModule as Module
class Linear(Module):
r"""Applies an affine linear transformation to the incoming data: :math:`y = xA^T + b`.
Args:
in_features (int): size of each input sample
out_features (int): size of each output sample
bias (bool): If set to ``False``, the layer will not learn an additive bias.
Default: ``True``
Shape:
- Input: :(*, H_in) where :math:`*` means any number of dimensions, H_in = in_features.
- Output: :math:(*, H_out) where all but the last dimension are the same shape as the input and :math:H_out = out_features.
Attributes:
weight(Tensor): the weights of the module of shape (out_features, in_features).
bias(Tensor): the bias of the module of shape (out_features).
"""
__constants__ = ["in_features", "out_features"]
in_features: int
out_features: int
weight: Tensor
def __init__(
self,
in_features: int,
out_features: int,
bias: bool = False,
device=None,
dtype=None,
) -> None:
factory_kwargs = {
"device": infinicore.device("cpu", 0) if device is None else device,
"dtype": infinicore.float32 if dtype is None else dtype,
}
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.weight = Parameter(
infinicore.empty([out_features, in_features], **factory_kwargs)
)
if bias:
self.bias = Parameter(infinicore.empty([out_features], **factory_kwargs))
else:
self.register_parameter("bias", None)
def forward(self, input: Tensor) -> Tensor:
return F.linear(input, self.weight, self.bias)
def extra_repr(self) -> str:
return f"in_features={self.in_features}, out_features={self.out_features}, bias={self.bias is not None}"
import numbers
from typing import Optional, Union
import infinicore
from infinicore.nn import functional as F
from ...tensor import Tensor
from ..parameter import InfiniCoreParameter as Parameter
from .module import InfiniCoreModule as Module
class RMSNorm(Module):
r"""Applies Root Mean Square Layer Normalization over a mini-batch of inputs.
The RMS is taken over the last dimensions, where normalized_shape is one-dimensional.
Args:
normalized_shape (int or list): input shape from an expected input of size [* , normalized_shape[0]]
this module will normalize over the last dimension.
eps (float): a value added to the denominator for numerical stability.
Shape:
- Input: (N, *)
- Output: (N, *) (same shape as input)
"""
__constants__ = ["normalized_shape", "eps"]
normalized_shape: tuple[int]
eps: Optional[float]
def __init__(
self,
normalized_shape: Union[int, list[int]],
eps=1e-6,
elementwise_affine=True,
device=None,
dtype=None,
) -> None:
factory_kwargs = {
"dtype": infinicore.float32 if dtype is None else dtype,
"device": infinicore.device("cpu", 0) if device is None else device,
}
super().__init__()
assert elementwise_affine, "elementwise_affine must be true."
if isinstance(normalized_shape, numbers.Integral):
normalized_shape = [normalized_shape]
self.normalized_shape = list(normalized_shape)
self.eps = eps
self.weight = Parameter(
infinicore.empty(self.normalized_shape, **factory_kwargs)
)
def forward(self, x: Tensor) -> Tensor:
return F.rms_norm(x, self.normalized_shape, self.weight, self.eps)
def extra_repr(self) -> str:
return "{normalized_shape}, eps={eps}".format(**self.__dict__)
import infinicore
from infinicore.nn import functional as F
from ...tensor import Tensor
from ..parameter import InfiniCoreParameter as Parameter
from .module import InfiniCoreModule as Module
class Embedding(Module):
r"""A simple lookup table that stores embeddings of a fixed dictionary and size.
This module is often used to store word embeddings and retrieve them using indices.
The input to the module is a list of indices, and the output is the corresponding
word embeddings.
Args:
num_embeddings (int): size of the dictionary of embeddings.
embedding_dim (int): the size of each embedding vector.
Attributes:
weight (Tensor): the weights of the module of shape (num_embeddings, embedding_dim).
Shape:
- Input: :(*), IntTensor or LongTensor of arbitrary shape containing the indices to extract.
- Output: (*, H), where `*` is the input shape and H=embedding_dim.
"""
__constants__ = [
"num_embeddings",
"embedding_dim",
]
num_embeddings: int
embedding_dim: int
weight: Tensor
def __init__(
self,
num_embeddings: int,
embedding_dim: int,
padding_idx=None,
max_norm=None,
norm_type=2.0,
scale_grad_by_freq=False,
sparse=False,
_weight=None,
_freeze=False,
device=None,
dtype=None,
) -> None:
factory_kwargs = {
"dtype": infinicore.float32 if dtype is None else dtype,
"device": infinicore.device("cpu", 0) if device is None else device,
}
super().__init__()
assert (
(padding_idx is None)
and (max_norm is None)
and (scale_grad_by_freq is False)
and (sparse is False)
and (_weight is None)
and (_freeze is False)
), "Unsupported parameters."
self.num_embeddings = num_embeddings
self.embedding_dim = embedding_dim
self.weight = Parameter(
infinicore.empty([num_embeddings, embedding_dim], **factory_kwargs)
)
def forward(self, input: Tensor) -> Tensor:
return F.embedding(input, self.weight)
def extra_repr(self) -> str:
s = "{num_embeddings}, {embedding_dim}"
return s.format(**self.__dict__)
......@@ -157,5 +157,5 @@ def from_torch(torch_tensor) -> Tensor:
dtype=infini_type._underlying,
device=infini_device._underlying,
),
torch_ref=torch_tensor,
_torch_ref=torch_tensor,
)
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import torch
from framework.base import BaseOperatorTest, TensorSpec, TestCase
from framework.runner import GenericTestRunner
from framework.tensor import TensorInitializer
from framework.utils import convert_infinicore_to_torch
import infinicore
# ==============================================================================
# Operator-specific configuration
# ==============================================================================
# Test cases format: (x_shape, weight_shape)
# weight (Tensor) – the weights of the module of shape (num_embeddings, embedding_dim).
_TEST_CASES_DATA = [
# Basic cases
((1, 5), (32000, 2048)),
((2, 5), (32000, 2048)),
((2, 2, 5), (32000, 2048)),
]
# Tolerance configuration
_TOLERANCE_MAP = {
infinicore.float16: {"atol": 0, "rtol": 1e-2},
infinicore.float32: {"atol": 0, "rtol": 1e-3},
infinicore.bfloat16: {"atol": 0, "rtol": 5e-2},
}
# Data types to test
_TENSOR_DTYPES = [infinicore.float16, infinicore.bfloat16, infinicore.float32]
def parse_test_cases():
"""
Parse test case data and return list of TestCase objects for all operation types.
Each test case contains all necessary information for execution and validation.
"""
test_cases = []
for x_shape, weight_shape in _TEST_CASES_DATA:
strides = None
# Generate test cases for all data types
for dtype in _TENSOR_DTYPES:
tolerance = _TOLERANCE_MAP.get(dtype, {"atol": 0, "rtol": 1e-3})
# Create typed tensor specs
x_spec = TensorSpec.from_tensor(
x_shape,
strides,
infinicore.int64,
init_mode=TensorInitializer.RANDINT,
low=1,
high=10000,
name="x",
)
weight_spec = TensorSpec.from_tensor(
weight_shape, strides, dtype, name="weight"
)
# Test Case 1: Out-of-place (return value)
test_cases.append(
TestCase(
inputs=[x_spec, weight_spec],
kwargs={},
output_spec=None,
comparison_target=None,
tolerance=tolerance,
description=f"nn.Embedding - OUT_OF_PLACE",
)
)
return test_cases
class OpTest(BaseOperatorTest):
"""nn.Embedding test with simplified implementation"""
def __init__(self):
super().__init__("nn.Embedding")
def get_test_cases(self):
return parse_test_cases()
def torch_operator(self, x, weight):
"""PyTorch nn.Embedding implementation"""
num_embeddings, embedding_dim = weight.shape
model = torch.nn.Embedding(
num_embeddings,
embedding_dim,
device=weight.device,
dtype=weight.dtype,
)
params_dict = {"weight": weight}
model.load_state_dict(params_dict)
with torch.no_grad():
y = model(x)
return y
def infinicore_operator(self, x, weight):
"""InfiniCore nn.Embedding implementation"""
if x.device.type != "cpu":
# 将 input的数据 转移到 cpu 上
x_torch = convert_infinicore_to_torch(x)
x_torch_cpu = x_torch.contiguous().cpu()
x = infinicore.from_torch(x_torch_cpu)
num_embeddings, embedding_dim = weight.shape
model = infinicore.nn.Embedding(
num_embeddings,
embedding_dim,
device=weight.device,
dtype=weight.dtype,
)
params_dict = {"weight": weight}
model.load_state_dict(params_dict)
y = model(x)
return y
def main():
"""Main entry point"""
runner = GenericTestRunner(OpTest)
runner.run_and_exit()
if __name__ == "__main__":
main()
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import torch
from framework.base import BaseOperatorTest, TensorSpec, TestCase
from framework.runner import GenericTestRunner
import infinicore
# ==============================================================================
# Operator-specific configuration
# ==============================================================================
# Test cases format: (x_shape, weight_shape, bias_shape, bias)
# weight – the weights of the module of shape (out_features,in_features)
_TEST_CASES_DATA = [
# Basic cases
((1, 10), (2, 10), (2,), True),
((4, 10), (2, 10), (2,), False),
((1, 1024), (3072, 1024), (3072,), True),
((5, 1024), (3072, 1024), (3072,), False),
]
# Tolerance configuration
_TOLERANCE_MAP = {
infinicore.float16: {"atol": 0, "rtol": 1e-2},
infinicore.float32: {"atol": 0, "rtol": 1e-3},
infinicore.bfloat16: {"atol": 0, "rtol": 5e-2},
}
# Data types to test
_TENSOR_DTYPES = [infinicore.float16, infinicore.bfloat16, infinicore.float32]
def parse_test_cases():
"""
Parse test case data and return list of TestCase objects for all operation types.
Each test case contains all necessary information for execution and validation.
"""
test_cases = []
for x_shape, weight_shape, bias_shape, has_bias in _TEST_CASES_DATA:
strides = None
# Generate test cases for all data types
for dtype in _TENSOR_DTYPES:
tolerance = _TOLERANCE_MAP.get(dtype, {"atol": 0, "rtol": 1e-3})
# Create typed tensor specs
x_spec = TensorSpec.from_tensor(x_shape, strides, dtype, name="x")
weight_spec = TensorSpec.from_tensor(
weight_shape, strides, dtype, name="weight"
)
bias_spec = TensorSpec.from_tensor(bias_shape, strides, dtype, name="bias")
# Test Case 1: Out-of-place (return value)
test_cases.append(
TestCase(
inputs=[x_spec, weight_spec, bias_spec],
kwargs={"has_bias": has_bias},
output_spec=None,
comparison_target=None,
tolerance=tolerance,
description=f"nn.Linear - OUT_OF_PLACE",
)
)
return test_cases
class InfiniNet(infinicore.nn.Module):
def __init__(
self,
in_features: int,
out_features: int,
bias: bool = False,
device=None,
dtype=None,
):
super().__init__()
factory_kwargs = {"device": device, "dtype": dtype}
self.l = infinicore.nn.Linear(
in_features, out_features, bias=bias, **factory_kwargs
)
def forward(self, x):
return self.l(x)
class TorchNet(torch.nn.Module):
def __init__(
self,
in_features: int,
out_features: int,
bias: bool = False,
device=None,
dtype=None,
):
super().__init__()
factory_kwargs = {"device": device, "dtype": dtype}
self.l = torch.nn.Linear(in_features, out_features, bias=bias, **factory_kwargs)
def forward(self, x):
return self.l(x)
class OpTest(BaseOperatorTest):
"""nn.Linear test with simplified implementation"""
def __init__(self):
super().__init__("nn.Linear")
def get_test_cases(self):
return parse_test_cases()
def torch_operator(self, x, weight, bias, has_bias):
"""PyTorch nn.Linear implementation"""
out_features, in_features = weight.shape
params_dict = {"l.weight": weight}
if has_bias:
params_dict["l.bias"] = bias
model = TorchNet(
in_features,
out_features,
bias=has_bias,
device=weight.device,
dtype=weight.dtype,
)
model.load_state_dict(params_dict)
with torch.no_grad():
y = model(x)
return y
def infinicore_operator(self, x, weight, bias, has_bias):
"""InfiniCore nn.Linear implementation"""
out_features, in_features = weight.shape
params_dict = {"l.weight": weight}
if has_bias:
params_dict["l.bias"] = bias
model = InfiniNet(
in_features,
out_features,
bias=has_bias,
device=weight.device,
dtype=weight.dtype,
)
model.load_state_dict(params_dict)
y = model(x)
return y
def main():
"""Main entry point"""
runner = GenericTestRunner(OpTest)
runner.run_and_exit()
if __name__ == "__main__":
main()
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import torch
from framework.base import BaseOperatorTest, TensorSpec, TestCase
from framework.runner import GenericTestRunner
import infinicore
# ==============================================================================
# Operator-specific configuration
# ==============================================================================
# Test cases format: (x_shape, weight_shape,)
_TEST_CASES_DATA = [
# Basic cases
((1, 8), (8,)),
((2, 3, 8), (8,)),
((2, 10, 64), (64,)),
]
# Tolerance configuration
_TOLERANCE_MAP = {
infinicore.float16: {"atol": 0, "rtol": 1e-2},
infinicore.float32: {"atol": 0, "rtol": 1e-3},
infinicore.bfloat16: {"atol": 0, "rtol": 5e-2},
}
# Data types to test
_TENSOR_DTYPES = [infinicore.float16, infinicore.bfloat16, infinicore.float32]
def parse_test_cases():
"""
Parse test case data and return list of TestCase objects for all operation types.
Each test case contains all necessary information for execution and validation.
"""
test_cases = []
for x_shape, weight_shape in _TEST_CASES_DATA:
strides = None
# Generate test cases for all data types
for dtype in _TENSOR_DTYPES:
tolerance = _TOLERANCE_MAP.get(dtype, {"atol": 0, "rtol": 1e-3})
# Create typed tensor specs
x_spec = TensorSpec.from_tensor(x_shape, strides, dtype, name="x")
weight_spec = TensorSpec.from_tensor(
weight_shape, strides, dtype, name="weight"
)
# Test Case 1: Out-of-place (return value)
test_cases.append(
TestCase(
inputs=[x_spec, weight_spec],
kwargs={},
output_spec=None,
comparison_target=None,
tolerance=tolerance,
description=f"nn.RMSNorm - OUT_OF_PLACE",
)
)
return test_cases
class OpTest(BaseOperatorTest):
"""nn.RMSNorm test with simplified implementation"""
def __init__(self):
super().__init__("nn.RMSNorm")
def get_test_cases(self):
return parse_test_cases()
def torch_operator(self, x, weight):
"""PyTorch nn.RMSNorm implementation"""
params_dict = {"weight": weight}
model = torch.nn.RMSNorm(
weight.shape,
device=weight.device,
dtype=weight.dtype,
)
model.load_state_dict(params_dict)
with torch.no_grad():
y = model(x)
return y
def infinicore_operator(self, x, weight):
"""InfiniCore nn.RMSNorm implementation"""
params_dict = {"weight": weight}
model = infinicore.nn.RMSNorm(
weight.shape,
device=weight.device,
dtype=weight.dtype,
)
model.load_state_dict(params_dict)
y = model(x)
return y
def main():
"""Main entry point"""
runner = GenericTestRunner(OpTest)
runner.run_and_exit()
if __name__ == "__main__":
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment