Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
jerrrrry
infinicore
Commits
b2e1f8b7
Commit
b2e1f8b7
authored
Nov 20, 2025
by
pengcheng888
Browse files
issue/608 - 修改functional中的rope, 添加nn.RoPE的实现和测试
parent
ed012302
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
301 additions
and
3 deletions
+301
-3
python/infinicore/nn/functional/rope.py
python/infinicore/nn/functional/rope.py
+13
-2
python/infinicore/nn/modules/__init__.py
python/infinicore/nn/modules/__init__.py
+2
-1
python/infinicore/nn/modules/rope.py
python/infinicore/nn/modules/rope.py
+92
-0
test/infinicore/nn/rope.py
test/infinicore/nn/rope.py
+194
-0
No files found.
python/infinicore/nn/functional/rope.py
View file @
b2e1f8b7
...
...
@@ -20,6 +20,16 @@ def rope(
)
->
Tensor
:
r
"""Rotary Position Embedding(RoPE)."""
bs
,
seq_len
,
num_heads
,
head_dim
=
x
.
shape
x_stride
=
x
.
stride
()
assert
seq_len
*
x_stride
[
1
]
==
x_stride
[
0
],
(
"x need to be continuous in dim=0 and dim=1"
)
x
=
x
.
view
((
bs
*
seq_len
,
num_heads
,
head_dim
))
bs
,
num
=
pos_ids
.
shape
pos_ids
=
pos_ids
.
view
((
bs
*
num
,))
if
out
is
None
:
return
Tensor
(
_infinicore
.
rope
(
...
...
@@ -29,8 +39,9 @@ def rope(
cos_table
.
_underlying
,
algo
,
)
)
)
.
view
((
bs
,
seq_len
,
num_heads
,
head_dim
))
out
=
out
.
view
((
bs
*
seq_len
,
num_heads
,
head_dim
))
_infinicore
.
rope_
(
out
.
_underlying
,
x
.
_underlying
,
...
...
@@ -39,4 +50,4 @@ def rope(
cos_table
.
_underlying
,
algo
,
)
return
out
return
out
.
view
((
bs
,
seq_len
,
num_heads
,
head_dim
))
python/infinicore/nn/modules/__init__.py
View file @
b2e1f8b7
...
...
@@ -2,6 +2,7 @@ from .container import InfiniCoreModuleList as ModuleList
from
.linear
import
Linear
from
.module
import
InfiniCoreModule
as
Module
from
.normalization
import
RMSNorm
from
.rope
import
RoPE
from
.sparse
import
Embedding
__all__
=
[
"Linear"
,
"RMSNorm"
,
"Embedding"
,
"ModuleList"
,
"Module"
]
__all__
=
[
"Linear"
,
"RMSNorm"
,
"Embedding"
,
"RoPE"
,
"ModuleList"
,
"Module"
]
python/infinicore/nn/modules/rope.py
0 → 100644
View file @
b2e1f8b7
import
numpy
as
np
import
infinicore
from
infinicore.nn
import
functional
as
F
from
...tensor
import
Tensor
from
..functional
import
RopeAlgo
from
.module
import
InfiniCoreModule
as
Module
def
create_sin_cos_table_numpy
(
max_position
,
head_dim
,
theta
=
10000.0
):
assert
head_dim
%
2
==
0
,
"Embedding dimension must be even."
pos
=
np
.
arange
(
0
,
max_position
)
freqs
=
1.0
/
(
theta
**
(
np
.
arange
(
0
,
head_dim
,
2
)[:
(
head_dim
//
2
)].
astype
(
float
)
/
head_dim
)
)
angles
=
np
.
outer
(
pos
,
freqs
)
sin_table
=
np
.
sin
(
angles
,
dtype
=
np
.
float32
)
cos_table
=
np
.
cos
(
angles
,
dtype
=
np
.
float32
)
return
sin_table
,
cos_table
def
create_sin_cos_table
(
max_position
,
head_dim
,
theta
=
10000.0
,
device
=
None
,
dtype
=
None
,
):
sin_table_np
,
cos_table_np
=
create_sin_cos_table_numpy
(
max_position
,
head_dim
,
theta
)
sin_table_infini
=
infinicore
.
from_numpy
(
sin_table_np
,
dtype
=
dtype
,
device
=
device
)
cos_table_infini
=
infinicore
.
from_numpy
(
cos_table_np
,
dtype
=
dtype
,
device
=
device
)
return
sin_table_infini
,
cos_table_infini
class
RoPE
(
Module
):
r
"""Rotary Position Embedding(RoPE)..
Args:
max_position_embeddings (int): The maximum sequence length that this model might ever be used with.
rope_theta (float): The base period of the RoPE embeddings.
head_dim (int): The attention head dimension.
Shape:
- Input: hidden_states, ( bs, seq_len, num_heads, head_dim).
- Output: hidden_states, ( bs, seq_len, num_heads, head_dim).
"""
__constants__
=
[
"max_position_embeddings"
,
"rope_theta"
,
"head_dim"
]
max_position_embeddings
:
int
rope_theta
:
float
head_dim
:
int
def
__init__
(
self
,
max_position_embeddings
:
int
,
rope_theta
:
float
,
head_dim
:
int
,
device
=
None
,
dtype
=
None
,
):
factory_kwargs
=
{
"device"
:
infinicore
.
device
(
"cpu"
,
0
)
if
device
is
None
else
device
,
"dtype"
:
infinicore
.
float32
if
dtype
is
None
else
dtype
,
}
super
().
__init__
()
self
.
max_position_embeddings
=
max_position_embeddings
self
.
rope_theta
=
rope_theta
self
.
head_dim
=
head_dim
self
.
_sin_table
,
self
.
_cos_table
=
create_sin_cos_table
(
self
.
max_position_embeddings
,
head_dim
=
self
.
head_dim
,
theta
=
self
.
rope_theta
,
**
factory_kwargs
,
)
def
forward
(
self
,
states
:
Tensor
,
position_ids
:
Tensor
,
algo
=
RopeAlgo
.
GPT_NEOX
):
F
.
rope
(
states
,
position_ids
,
self
.
_sin_table
,
self
.
_cos_table
,
algo
=
algo
,
out
=
states
,
)
return
states
test/infinicore/nn/rope.py
0 → 100644
View file @
b2e1f8b7
import
os
import
sys
sys
.
path
.
insert
(
0
,
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
".."
))
import
torch
from
framework.base
import
BaseOperatorTest
,
TensorSpec
,
TestCase
from
framework.runner
import
GenericTestRunner
from
infinicore.nn.functional
import
RopeAlgo
import
infinicore
# ==============================================================================
# Operator-specific configuration
# ==============================================================================
# Test cases format: (x_shape)
# bs, seq_len, num_heads, head_dim
_TEST_CASES_DATA
=
[
# Basic cases
(
1
,
10
,
32
,
64
),
(
2
,
2
,
32
,
64
),
(
5
,
10
,
32
,
64
),
]
# Tolerance configuration
_TOLERANCE_MAP
=
{
infinicore
.
float16
:
{
"atol"
:
1e-2
,
"rtol"
:
1e-2
},
infinicore
.
float32
:
{
"atol"
:
1e-3
,
"rtol"
:
1e-3
},
infinicore
.
bfloat16
:
{
"atol"
:
5e-2
,
"rtol"
:
5e-2
},
}
# Data types to test
_TENSOR_DTYPES
=
[
infinicore
.
float16
,
infinicore
.
float32
]
def
parse_test_cases
():
"""
Parse test case data and return list of TestCase objects for all operation types.
Each test case contains all necessary information for execution and validation.
"""
test_cases
=
[]
for
bs
,
seq_len
,
num_heads
,
head_dim
in
_TEST_CASES_DATA
:
strides
=
None
# Generate test cases for all data types
for
dtype
in
_TENSOR_DTYPES
:
tolerance
=
_TOLERANCE_MAP
.
get
(
dtype
,
{
"atol"
:
0
,
"rtol"
:
1e-3
})
x_shape
=
[
bs
,
seq_len
,
num_heads
,
head_dim
]
# Create typed tensor specs
x_spec
=
TensorSpec
.
from_tensor
(
x_shape
,
strides
,
dtype
,
name
=
"x"
)
max_position_embeddings
=
1024
rope_theta
=
10000.0
# Test Case 1: Out-of-place (return value)
test_cases
.
append
(
TestCase
(
inputs
=
[
x_spec
],
kwargs
=
{
"max_position_embeddings"
:
max_position_embeddings
,
"rope_theta"
:
rope_theta
,
},
output_spec
=
None
,
comparison_target
=
None
,
tolerance
=
tolerance
,
description
=
f
"nn.RoPE - OUT_OF_PLACE"
,
)
)
return
test_cases
def
rotary_embedding
(
t
,
max_position_embeddings
,
rope_theta
,
head_dim
,
algo
=
RopeAlgo
.
GPT_NEOX
,
):
def
create_sin_cos_table
(
max_position
,
head_dim
,
theta
=
10000.0
,
torch_dtype
=
torch
.
float32
,
torch_device
=
"cpu"
,
):
assert
head_dim
%
2
==
0
,
"Embedding dimension must be even."
pos
=
torch
.
arange
(
0
,
max_position
)
freqs
=
1.0
/
(
theta
**
(
torch
.
arange
(
0
,
head_dim
,
2
)[:
(
head_dim
//
2
)].
float
()
/
head_dim
)
)
angles
=
torch
.
outer
(
pos
,
freqs
)
return
torch
.
sin
(
angles
).
to
(
dtype
=
torch_dtype
,
device
=
torch_device
),
torch
.
cos
(
angles
).
to
(
dtype
=
torch_dtype
,
device
=
torch_device
)
def
_torch_rope
(
sin
,
cos
,
t1
,
t2
):
cos
=
cos
.
unsqueeze
(
1
)
# [seq_len, 1, dh // 2]
sin
=
sin
.
unsqueeze
(
1
)
# [seq_len, 1, dh // 2]
t_out_1
=
t1
*
cos
-
t2
*
sin
t_out_2
=
t1
*
sin
+
t2
*
cos
return
t_out_1
,
t_out_2
sin
,
cos
=
create_sin_cos_table
(
max_position_embeddings
,
head_dim
,
rope_theta
,
torch_device
=
t
.
device
)
ans
=
t
.
clone
()
dh
=
t
.
shape
[
-
1
]
dt
=
t
.
dtype
assert
dh
%
2
==
0
,
"Embedding dimension must be even."
if
RopeAlgo
.
GPT_J
==
algo
:
t_even
=
t
[...,
0
::
2
]
# [seq_len, n_head, dh // 2]
t_odd
=
t
[...,
1
::
2
]
# [seq_len, n_head, dh // 2]
t_out_even
,
t_out_odd
=
_torch_rope
(
sin
,
cos
,
t_even
,
t_odd
)
ans
[...,
0
::
2
]
=
t_out_even
.
to
(
dt
)
ans
[...,
1
::
2
]
=
t_out_odd
.
to
(
dt
)
elif
RopeAlgo
.
GPT_NEOX
==
algo
:
half_dim
=
dh
//
2
t_first
=
t
[...,
:
half_dim
]
t_second
=
t
[...,
half_dim
:]
t_out_first
,
t_out_second
=
_torch_rope
(
sin
,
cos
,
t_first
,
t_second
)
ans
[...,
:
half_dim
]
=
t_out_first
.
to
(
dt
)
ans
[...,
half_dim
:]
=
t_out_second
.
to
(
dt
)
else
:
raise
KeyError
(
"error Algo "
)
return
ans
class
OpTest
(
BaseOperatorTest
):
"""nn.RoPE test with simplified implementation"""
def
__init__
(
self
):
super
().
__init__
(
"nn.RoPE"
)
def
get_test_cases
(
self
):
return
parse_test_cases
()
def
torch_operator
(
self
,
x
,
max_position_embeddings
,
rope_theta
):
"""PyTorch nn.RoPE implementation"""
bs
,
seq_len
,
num_heads
,
head_dim
=
x
.
shape
return
rotary_embedding
(
x
,
seq_len
,
rope_theta
,
head_dim
)
def
infinicore_operator
(
self
,
x
,
max_position_embeddings
,
rope_theta
):
"""InfiniCore nn.RoPE implementation"""
bs
,
seq_len
,
num_heads
,
head_dim
=
x
.
shape
torch_device
=
"cpu"
if
x
.
device
.
type
!=
"cpu"
:
torch_device
=
"cuda"
# 创建 pos_ids的变量
pos_ids_torch
=
torch
.
arange
(
0
,
seq_len
,
dtype
=
torch
.
int32
,
device
=
torch_device
)
pos_ids_torch
=
pos_ids_torch
.
unsqueeze
(
0
)
pos_ids_torch
=
pos_ids_torch
.
expand
(
bs
,
seq_len
).
contiguous
()
pos_ids_infini
=
infinicore
.
from_torch
(
pos_ids_torch
)
# 创建类
rope_instance
=
infinicore
.
nn
.
RoPE
(
max_position_embeddings
,
rope_theta
,
head_dim
,
device
=
x
.
device
,
dtype
=
x
.
dtype
,
)
# 计算
y
=
rope_instance
(
x
,
pos_ids_infini
)
return
y
def
main
():
"""Main entry point"""
runner
=
GenericTestRunner
(
OpTest
)
runner
.
run_and_exit
()
if
__name__
==
"__main__"
:
main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment