Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
tsoc
migraphx-auto-test
Commits
2e912f00
Commit
2e912f00
authored
Feb 06, 2026
by
wangkx1
Browse files
init
parents
Changes
24
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
408 additions
and
0 deletions
+408
-0
scripts/__pycache__/monitor.cpython-310.pyc
scripts/__pycache__/monitor.cpython-310.pyc
+0
-0
scripts/base_runner.py
scripts/base_runner.py
+90
-0
scripts/model_runner.py
scripts/model_runner.py
+203
-0
scripts/monitor.py
scripts/monitor.py
+115
-0
No files found.
scripts/__pycache__/monitor.cpython-310.pyc
0 → 100644
View file @
2e912f00
File added
scripts/base_runner.py
0 → 100644
View file @
2e912f00
# scripts/base_runner.py
import
subprocess
import
os
from
typing
import
Dict
,
Any
,
List
,
Tuple
class
BaseRunner
:
"""基础模型运行器"""
def
__init__
(
self
,
config
:
Dict
[
str
,
Any
],
env
):
self
.
config
=
config
self
.
env
=
env
self
.
base_config
=
self
.
config
.
get
(
'base'
,
{})
self
.
common_config
=
self
.
base_config
.
get
(
'common'
,
{})
def
build_command
(
self
,
model_file
:
str
,
model_config
:
Dict
[
str
,
Any
],
batch_size
:
int
)
->
List
[
str
]:
"""构建migraphx-driver命令"""
# 基础命令
cmd
=
[
self
.
common_config
.
get
(
'migraphx_driver'
,
'/opt/dtk/bin/migraphx-driver'
),
"perf"
,
model_file
]
# FP16选项
if
self
.
common_config
.
get
(
'fp16'
,
True
):
cmd
.
append
(
"--fp16"
)
# 迭代次数
iterations
=
model_config
.
get
(
'iterations'
,
self
.
common_config
.
get
(
'iterations'
,
100
))
cmd
.
extend
([
"-n"
,
str
(
iterations
)])
# 输入维度
inputs
=
model_config
.
get
(
'inputs'
,
[])
for
input_config
in
inputs
:
input_name
=
input_config
.
get
(
'name'
,
'x'
)
shape
=
input_config
.
get
(
'shape'
,
[])
# 将batch size插入到形状的第一个位置
full_shape
=
[
str
(
batch_size
)]
+
[
str
(
dim
)
for
dim
in
shape
]
cmd
.
extend
([
"--input-dim"
,
f
"@
{
input_name
}
"
,
*
full_shape
])
# 额外参数
extra_args
=
model_config
.
get
(
'extra_args'
,
[])
if
extra_args
:
cmd
.
extend
(
extra_args
)
return
cmd
def
run_model
(
self
,
cmd
:
List
[
str
],
log_file
:
str
)
->
Tuple
[
bool
,
str
]:
"""运行模型并记录日志"""
try
:
# 执行命令并同时输出到终端和文件
with
open
(
log_file
,
'w'
)
as
f
:
print
(
f
"执行命令:
{
' '
.
join
(
cmd
)
}
"
)
# 运行命令,实时输出到终端和文件
process
=
subprocess
.
Popen
(
cmd
,
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
,
text
=
True
,
bufsize
=
1
,
universal_newlines
=
True
)
# 实时读取输出
output_lines
=
[]
for
line
in
process
.
stdout
:
print
(
line
,
end
=
''
)
f
.
write
(
line
)
output_lines
.
append
(
line
)
process
.
wait
()
output
=
''
.
join
(
output_lines
)
success
=
(
process
.
returncode
==
0
)
return
success
,
output
except
Exception
as
e
:
error_msg
=
f
"执行命令失败:
{
e
}
"
print
(
error_msg
)
with
open
(
log_file
,
'a'
)
as
f
:
f
.
write
(
f
"
\n
错误:
{
error_msg
}
\n
"
)
return
False
,
error_msg
\ No newline at end of file
scripts/model_runner.py
0 → 100644
View file @
2e912f00
# scripts/model_runner.py
import
os
import
yaml
from
typing
import
Dict
,
Any
,
List
from
.base_runner
import
BaseRunner
from
.monitor
import
MemoryMonitor
class
ModelRunner
(
BaseRunner
):
"""模型运行器"""
def
__init__
(
self
,
config
:
Dict
[
str
,
Any
],
env
):
super
().
__init__
(
config
,
env
)
def
load_model_config
(
self
,
model_name
:
str
)
->
Dict
[
str
,
Any
]:
"""加载模型配置"""
config_file
=
f
"config/
{
model_name
}
.yaml"
if
not
os
.
path
.
exists
(
config_file
):
raise
FileNotFoundError
(
f
"模型配置文件不存在:
{
config_file
}
"
)
with
open
(
config_file
,
'r'
)
as
f
:
return
yaml
.
safe_load
(
f
)
def
run_single_model
(
self
,
model_name
:
str
):
"""运行单个模型"""
print
(
f
"
\n
{
'='
*
60
}
"
)
print
(
f
"开始测试模型:
{
model_name
}
"
)
print
(
f
"
{
'='
*
60
}
"
)
# 加载模型配置
try
:
model_config
=
self
.
load_model_config
(
model_name
)
except
FileNotFoundError
as
e
:
print
(
f
"错误:
{
e
}
"
)
return
# 设置环境变量
self
.
env
.
setup
(
model_config
.
get
(
'model'
,
{}).
get
(
'env_vars'
,
{}))
# 创建监控器
monitor
=
MemoryMonitor
(
device_id
=
self
.
env
.
device_id
,
log_file
=
self
.
base_config
.
get
(
'monitor'
,
{}).
get
(
'log_file'
,
'memory_simple.log'
)
)
# 获取结果目录
result_dir
=
self
.
env
.
get_result_dir
()
# 获取模型配置
model_info
=
model_config
.
get
(
'model'
,
{})
# 检查是单个模型文件还是多个模型文件
if
'model_files'
in
model_info
:
# 多个模型文件的情况(如YOLOv3)
self
.
run_multiple_model_files
(
model_info
,
monitor
,
result_dir
)
else
:
# 单个模型文件的情况
self
.
run_single_model_file
(
model_info
,
monitor
,
result_dir
)
print
(
f
"
\n
✓
{
model_name
}
测试完成!"
)
def
run_single_model_file
(
self
,
model_info
:
Dict
[
str
,
Any
],
monitor
:
MemoryMonitor
,
result_dir
:
str
):
"""运行单个模型文件(多个batch size)"""
model_file
=
model_info
.
get
(
'model_file'
)
if
not
model_file
or
not
os
.
path
.
exists
(
model_file
):
print
(
f
"错误: 模型文件不存在:
{
model_file
}
"
)
return
model_name
=
os
.
path
.
basename
(
model_file
)
batch_sizes
=
model_info
.
get
(
'batch_sizes'
,
[
1
,
8
])
print
(
f
"模型文件:
{
model_file
}
"
)
print
(
f
"测试batch大小:
{
batch_sizes
}
"
)
print
(
f
"
{
'-'
*
60
}
"
)
for
batch
in
batch_sizes
:
print
(
f
"
\n
正在测试 batch=
{
batch
}
..."
)
print
(
f
"
{
'-'
*
40
}
"
)
# 开始监控
total_memory
=
monitor
.
start
()
# 构建命令
cmd
=
self
.
build_command
(
model_file
,
model_info
,
batch
)
# 生成日志文件名
log_file
=
os
.
path
.
join
(
result_dir
,
f
"
{
model_name
}
-
{
batch
}
batch.log"
)
# 运行模型
success
,
output
=
self
.
run_model
(
cmd
,
log_file
)
# 停止监控
monitor
.
stop
()
# 获取统计信息
stats
=
monitor
.
get_statistics
()
# 输出统计信息
print
(
f
"
\n
=== 显存使用统计 ==="
)
print
(
f
"最大使用:
{
stats
[
'max_used'
]:.
2
f
}
MiB"
)
print
(
f
"总显存:
{
total_memory
}
MiB"
)
print
(
f
"峰值使用率:
{
stats
[
'max_percent'
]:.
2
f
}
%"
)
# 将统计信息追加到日志文件
with
open
(
log_file
,
'a'
)
as
f
:
f
.
write
(
f
"
\n
=== 统计摘要 ===
\n
"
)
f
.
write
(
f
"最大使用:
{
stats
[
'max_used'
]:.
2
f
}
MiB
\n
"
)
f
.
write
(
f
"总显存:
{
total_memory
}
MiB
\n
"
)
f
.
write
(
f
"峰值使用率:
{
stats
[
'max_percent'
]:.
2
f
}
%
\n
"
)
if
success
:
print
(
f
"✓ batch=
{
batch
}
测试完成,日志保存至:
{
log_file
}
"
)
else
:
print
(
f
"✗ batch=
{
batch
}
测试失败!"
)
print
(
f
"
{
'-'
*
40
}
"
)
def
run_multiple_model_files
(
self
,
model_info
:
Dict
[
str
,
Any
],
monitor
:
MemoryMonitor
,
result_dir
:
str
):
"""运行多个模型文件(如YOLOv3不同batch size有不同文件)"""
model_files
=
model_info
.
get
(
'model_files'
,
[])
if
not
model_files
:
print
(
"错误: 没有找到模型文件配置"
)
return
print
(
f
"测试多个模型文件..."
)
print
(
f
"
{
'-'
*
60
}
"
)
for
model_file_info
in
model_files
:
model_file
=
model_file_info
.
get
(
'path'
)
batch
=
model_file_info
.
get
(
'batch'
,
1
)
if
not
model_file
or
not
os
.
path
.
exists
(
model_file
):
print
(
f
"警告: 模型文件不存在,跳过:
{
model_file
}
"
)
continue
print
(
f
"
\n
正在测试 batch=
{
batch
}
..."
)
print
(
f
"模型文件:
{
model_file
}
"
)
print
(
f
"
{
'-'
*
40
}
"
)
# 开始监控
total_memory
=
monitor
.
start
()
# 构建命令
cmd
=
self
.
build_command
(
model_file
,
model_info
,
batch
)
# 生成日志文件名
model_name
=
os
.
path
.
basename
(
model_file
)
log_file
=
os
.
path
.
join
(
result_dir
,
f
"
{
model_name
}
-
{
batch
}
batch.log"
)
# 运行模型
success
,
output
=
self
.
run_model
(
cmd
,
log_file
)
# 停止监控
monitor
.
stop
()
# 获取统计信息
stats
=
monitor
.
get_statistics
()
# 输出统计信息
print
(
f
"
\n
=== 显存使用统计 ==="
)
print
(
f
"最大使用:
{
stats
[
'max_used'
]:.
2
f
}
MiB"
)
print
(
f
"总显存:
{
total_memory
}
MiB"
)
print
(
f
"峰值使用率:
{
stats
[
'max_percent'
]:.
2
f
}
%"
)
# 将统计信息追加到日志文件
with
open
(
log_file
,
'a'
)
as
f
:
f
.
write
(
f
"
\n
=== 统计摘要 ===
\n
"
)
f
.
write
(
f
"最大使用:
{
stats
[
'max_used'
]:.
2
f
}
MiB
\n
"
)
f
.
write
(
f
"总显存:
{
total_memory
}
MiB
\n
"
)
f
.
write
(
f
"峰值使用率:
{
stats
[
'max_percent'
]:.
2
f
}
%
\n
"
)
if
success
:
print
(
f
"✓ batch=
{
batch
}
测试完成,日志保存至:
{
log_file
}
"
)
else
:
print
(
f
"✗ batch=
{
batch
}
测试失败!"
)
print
(
f
"
{
'-'
*
40
}
"
)
def
run_all_models
(
self
):
"""运行所有模型"""
models_to_run
=
self
.
base_config
.
get
(
'models_to_run'
,
[])
if
not
models_to_run
:
print
(
"错误: 没有配置要运行的模型"
)
return
print
(
f
"将运行以下模型:
{
models_to_run
}
"
)
for
model_name
in
models_to_run
:
try
:
self
.
run_single_model
(
model_name
)
except
Exception
as
e
:
print
(
f
"运行模型
{
model_name
}
时出错:
{
e
}
"
)
continue
print
(
f
"
\n
{
'='
*
60
}
"
)
print
(
"所有测试完成!"
)
print
(
f
"
{
'='
*
60
}
"
)
\ No newline at end of file
scripts/monitor.py
0 → 100644
View file @
2e912f00
# scripts/monitor.py
import
subprocess
import
threading
import
time
from
datetime
import
datetime
from
typing
import
Optional
class
MemoryMonitor
:
"""显存使用监控器"""
def
__init__
(
self
,
device_id
:
int
,
log_file
:
str
=
"memory_simple.log"
):
self
.
device_id
=
device_id
self
.
log_file
=
log_file
self
.
monitoring
=
False
self
.
monitor_thread
:
Optional
[
threading
.
Thread
]
=
None
self
.
total_memory
:
Optional
[
int
]
=
None
def
get_total_memory
(
self
)
->
Optional
[
int
]:
"""获取总显存"""
try
:
cmd
=
f
"hy-smi -d
{
self
.
device_id
}
--showmeminfo vram --showuse"
result
=
subprocess
.
run
(
cmd
,
shell
=
True
,
capture_output
=
True
,
text
=
True
)
for
line
in
result
.
stdout
.
split
(
'
\n
'
):
if
"vram Total Memory"
in
line
:
parts
=
line
.
split
(
':'
)
if
len
(
parts
)
>=
3
:
memory_str
=
parts
[
2
].
strip
().
split
()[
0
]
return
int
(
memory_str
)
except
Exception
as
e
:
print
(
f
"获取总显存失败:
{
e
}
"
)
return
None
def
monitor_memory
(
self
):
"""监控显存使用"""
with
open
(
self
.
log_file
,
'w'
)
as
f
:
f
.
write
(
""
)
# 清空文件
while
self
.
monitoring
:
try
:
timestamp
=
datetime
.
now
().
strftime
(
"%Y-%m-%d %H:%M:%S"
)
cmd
=
f
"hy-smi -d
{
self
.
device_id
}
--showmeminfo vram --showuse"
result
=
subprocess
.
run
(
cmd
,
shell
=
True
,
capture_output
=
True
,
text
=
True
)
used_memory
=
None
used_percent
=
None
for
line
in
result
.
stdout
.
split
(
'
\n
'
):
if
"vram Total Used Memory"
in
line
:
parts
=
line
.
split
(
':'
)
if
len
(
parts
)
>=
3
:
used_memory
=
parts
[
2
].
strip
().
split
()[
0
]
elif
"HCU use"
in
line
:
parts
=
line
.
split
(
':'
)
if
len
(
parts
)
>=
3
:
used_percent
=
parts
[
2
].
strip
().
split
()[
0
]
if
used_memory
and
used_percent
:
with
open
(
self
.
log_file
,
'a'
)
as
f
:
f
.
write
(
f
"
{
timestamp
}
{
used_memory
}
{
used_percent
}
\n
"
)
except
Exception
as
e
:
print
(
f
"监控出错:
{
e
}
"
)
time
.
sleep
(
1
)
def
start
(
self
):
"""开始监控"""
self
.
total_memory
=
self
.
get_total_memory
()
if
self
.
total_memory
:
print
(
f
"总显存:
{
self
.
total_memory
}
MiB"
)
print
(
"开始监控显存使用..."
)
self
.
monitoring
=
True
self
.
monitor_thread
=
threading
.
Thread
(
target
=
self
.
monitor_memory
)
self
.
monitor_thread
.
start
()
return
self
.
total_memory
def
stop
(
self
):
"""停止监控"""
self
.
monitoring
=
False
if
self
.
monitor_thread
:
self
.
monitor_thread
.
join
(
timeout
=
2
)
def
get_statistics
(
self
)
->
dict
[
str
,
any
]:
"""获取统计信息"""
stats
=
{
"total_memory"
:
self
.
total_memory
,
"max_used"
:
0
,
"max_percent"
:
0
}
try
:
with
open
(
self
.
log_file
,
'r'
)
as
f
:
lines
=
f
.
readlines
()
if
lines
:
# 提取最大使用量和最大使用率
used_values
=
[
float
(
line
.
split
()[
2
])
for
line
in
lines
if
len
(
line
.
split
())
>=
3
]
percent_values
=
[
float
(
line
.
split
()[
3
])
for
line
in
lines
if
len
(
line
.
split
())
>=
4
]
if
used_values
:
stats
[
"max_used"
]
=
max
(
used_values
)
if
percent_values
:
stats
[
"max_percent"
]
=
max
(
percent_values
)
except
Exception
as
e
:
print
(
f
"读取监控日志失败:
{
e
}
"
)
return
stats
\ No newline at end of file
Prev
1
2
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment