Commit 39a85c88 authored by zk's avatar zk
Browse files

新增migraphx脚本推理

parent a1865640
......@@ -216,6 +216,16 @@ bash migraphx_export.bash
bash migraphx_perf.bash
```
4. 使用python脚本测试
```bash
python migraphx_infer.py
# offload=False推理,提前开辟gpu空间,数据放在device推理
python migraphx_infer1.py
# offload=True推理,会慢一些
```
-----
## 8\. 测试结果对比
......@@ -252,7 +262,8 @@ bash migraphx_perf.bash
| **ORT + Plugin** | +自定义算子<br>+FP16 纯量化方案 B | `ground_deform_fp16_all.onnx` | `ort_plugin_fp16_B` | 105.35 | 9.49 |
| **ORT + Plugin** | +自定义算子<br>+FP16 极致优化方案 C | `ground_deform_fp16_all.onnx` | `ort_plugin_fp16_C` | 100.91 | 9.90 |
### 8.3 migraphx BW100 测试结果
### 8.3 migraphx BW150和BW100 测试结果
BW100示例结果:
```
Batch size: 1
Rate: 6.05197 inferences/sec
......@@ -263,6 +274,15 @@ Total instructions time: 205.275ms
Overhead time: 2.32812ms, -40.0399ms
Overhead: 1%, -24%
```
汇总结果
| 设备 | 推理方式 | FPS | 平均推理时间 (ms) |
| :--- | :--- | :--- | :--- |
| BW150 | migraphx-driver | 14.93 | 66.97 |
| BW150 | Python + MIGraphX(device) | 13.65 | 73.20(包含前后处理) |
| BW100 | migraphx-driver | 13.54 | 73.87 |
| BW100 | Python + MIGraphX(device) | 12.12 | 82.44(包含前后处理) |
-----
## 参考项目
......
......@@ -214,6 +214,7 @@ if __name__ == '__main__':
image_source, image = load_image(img_path)
providers = [
# 'MIGraphXExecutionProvider',
'ROCMExecutionProvider',
'CPUExecutionProvider'
]
......
export MIGRAPHX_ENABLE_MIOPEN_CONCAT=1
export MIGRAPHX_TRACE_COMPILE=1
migraphx-driver perf --onnx \
../weights/ground_opt.onnx \
../weights/ground_opt_0430.onnx \
--fp16 \
--output \
../weights/ground_opt.mxr
\ No newline at end of file
../weights/ground_opt_0430.mxr
# ../weights/ground_opt_0430.mxr > migraphx_log.log 2>&1
\ No newline at end of file
......@@ -3,203 +3,208 @@ import numpy as np
import torch
import time
import os
import bisect
import migraphx
from transformers import BertTokenizer
from groundingdino.util.inference import load_image
from groundingdino.models.GroundingDINO.bertwarper import generate_masks_with_special_tokens_and_transfer_map
from typing import Tuple, List, Dict
import groundingdino.datasets.transforms as T
from PIL import Image
# =========================
# 工具函数
# 预处理
# =========================
def load_image(image_path: str) -> Tuple[np.array, torch.Tensor]:
transform = T.Compose(
[
T.RandomResize([800], max_size=1333),
T.ToTensor(),
T.Normalize([0.485, 0.456, 0.406],
[0.229, 0.224, 0.225]),
]
)
image_source = Image.open(image_path).convert("RGB")
image = np.asarray(image_source)
image_transformed, _ = transform(image_source, None)
return image, image_transformed
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def preprocess_caption(caption: str) -> str:
result = caption.lower().strip()
if result.endswith("."):
return result
return result + "."
def to_mgx(x):
if x.dtype == np.int64:
return migraphx.argument(x.astype(np.int64))
elif x.dtype == np.bool_:
return migraphx.argument(x.astype(np.bool_))
else:
return migraphx.argument(x.astype(np.float32))
def _mgx_shape_to_numpy(shape):
# 将 migraphx input shape 映射到 numpy dtype + lens 以生成零填充张量
shape_str = str(shape)
if "int64_type" in shape_str:
dtype = np.int64
elif "bool_type" in shape_str:
dtype = np.bool_
elif "half_type" in shape_str:
dtype = np.float16
# =========================
# 文本标签还原逻辑 (移除 Tokenizer 依赖)
# =========================
def get_phrases_from_posmap(
posmap: np.ndarray, tokens: List[str], left_idx: int = 0, right_idx: int = 255
):
"""
直接用字符串列表映射,抛弃沉重的 Tokenizer
"""
assert isinstance(posmap, np.ndarray), "posmap must be np.ndarray"
if posmap.ndim == 1:
# 将指定范围内的元素设为 False
posmap[:left_idx + 1] = False
posmap[right_idx:] = False
# 获取非零元素的索引
non_zero_idx = np.nonzero(posmap)[0]
# 提取被激活的单词,并自动过滤掉特殊占位符
words = [tokens[i] for i in non_zero_idx if tokens[i] not in ["[CLS]", "[SEP]", "."]]
return " ".join(words).strip()
else:
dtype = np.float32
try:
dims = list(shape.dims())
except Exception:
dims = []
try:
lens = list(shape.lens())
except Exception:
lens = []
# 优先用 dims,dims 为空时才退化到 lens
return dtype, (dims if len(dims) > 0 else lens)
raise NotImplementedError("posmap must be 1-dim")
# =========================
# 分配输出 GPU 内存 (offload_copy=False 必须)
# =========================
def allocate_output_memory(model):
output_data = {}
for key in model.get_outputs().keys():
output_data[key] = migraphx.allocate_gpu(
s=model.get_outputs()[key]
)
return output_data
# =========================
# 🚀 MIGraphX 推理类(带缓存)
# MIGraphX 模型类
# =========================
class MIGraphXModel:
def __init__(self, onnx_path, cache_path="weights/ground_opt.mxr", force_recompile=False):
def __init__(self,
onnx_path,
cache_path="../weights/ground_opt_0430.mxr",
device_id=3,
force_recompile=False):
self.cache_path = cache_path
# ====== 优先加载缓存 ======
if os.path.exists(cache_path) and not force_recompile:
print(f"⚡ 直接加载已编译模型: {cache_path}")
print(f"⚡ 直接加载缓存模型: {cache_path}")
self.model = migraphx.load(cache_path)
else:
print("🔍 从 ONNX 构建 MIGraphX")
self.model = migraphx.parse_onnx(onnx_path)
print(self.model)
# ====================== 2. 打印模型输入输出信息 ======================
print("=== 模型输入信息 ===")
inputs = self.model.get_inputs()
for key, value in inputs.items():
print(f"{key}: {value}")
print("\n=== 模型输出信息 ===")
outputs = self.model.get_outputs()
for key, value in outputs.items():
print(f"{key}: {value}")
"""
=== 模型输入信息 ===
text_token_mask: bool_type, {1, 4, 4}, {16, 4, 1}
token_type_ids: int64_type, {1, 4}, {4, 1}
position_ids: int64_type, {1, 4}, {4, 1}
attention_mask: bool_type, {1, 4}, {4, 1}
input_ids: int64_type, {1, 4}, {4, 1}
img: float_type, {1, 3, 800, 1200}, {2880000, 960000, 1200, 1}
=== 模型输出信息 ===
boxes: float_type, {1, 900, 4}, {3600, 4, 1}
logits: float_type, {1, 900, 256}, {230400, 256, 1}
输入节点名称: text_token_mask
输入形状 (N, C, H, W): [1, 4, 4]
"""
# print("\n⚡ 量化模型(FP16)")
# migraphx.quantize_fp16(self.model)
print("⚙️ 编译 MIGraphX(GPU)")
print("🔍 从 ONNX 构建模型")
self.model = migraphx.parse_onnx(onnx_path)
print("\n=== 输入信息 ===")
for k, v in self.model.get_inputs().items():
print(f"{k}: {v}")
print("\n=== 输出信息 ===")
for k, v in self.model.get_outputs().items():
print(f"{k}: {v}")
print("\n⚙️ 编译模型(GPU + offload=false)")
self.model.compile(
t=migraphx.get_target("gpu"),device_id=5
t=migraphx.get_target("gpu"),
offload_copy=False,
device_id=device_id
)
# offload_copy=False, fast_math=False, exhaustive_tune=False
# ====== 保存缓存 ======
print(f"💾 保存编译模型到: {cache_path}")
print(f"💾 保存 mxr: {cache_path}")
migraphx.save(self.model, cache_path)
self.inputs = self.model.get_inputs()
self.outputs = self.model.get_outputs()
self.param_names = self.model.get_parameter_names()
self.input_shapes = self.model.get_inputs()
print("✅ param_names:", self.param_names)
print("✅ input_shape:", self.input_shapes)
try:
self.output_shapes = self.model.get_outputs()
print("✅ output_shapes keys:", list(self.output_shapes.keys()))
except Exception:
self.output_shapes = None
print("✅ input_shape:", self.inputs)
print("✅ output_shapes keys:", list(self.outputs.keys()))
self.output_gpu = allocate_output_memory(self.model)
print("✅ 模型初始化完成")
def infer(self, input_dict):
# 只按模型 get_inputs() 定义的输入签名来组装
mgx_inputs = {}
provided_names = set(input_dict.keys())
# 某些 mxr 会把内部输出别名也暴露到 get_parameter_names/get_inputs 里,
# 这里显式排除 main:#output_*,避免把内部输出当成输入填充。
required_names = {
k for k in self.input_shapes.keys()
if not str(k).startswith("main:#output")
}
missing = required_names - provided_names
if missing:
print("⚠️ 缺失模型输入,准备按 shape 自动补齐:")
for name in sorted(missing):
shape = self.input_shapes[name]
dtype, lens = _mgx_shape_to_numpy(shape)
mgx_inputs[name] = to_mgx(np.zeros(lens, dtype=dtype))
print(f" - {name}: shape={lens}, dtype={dtype.__name__}")
for name in (required_names & provided_names):
mgx_inputs[name] = to_mgx(input_dict[name])
# 额外的 key 不喂给模型,避免和内部签名冲突
extra = provided_names - required_names
if extra:
print("ℹ️ 有多余输入参数将被忽略:")
for name in sorted(extra):
print(f" - {name}")
mgx_data = self.output_gpu.copy()
for name in self.inputs.keys():
data = input_dict[name]
if data.dtype == np.float64:
data = data.astype(np.float32)
mgx_data[name] = migraphx.to_gpu(migraphx.argument(data))
start = time.time()
result = self.model.run(mgx_inputs)
results = self.model.run(mgx_data)
infer_time = time.time() - start
outputs = [np.array(r) for r in result]
outputs = [
np.array(migraphx.from_gpu(r))
for r in results
]
return outputs, infer_time
# =========================
# 推理函数
# 推理逻辑 (引入真正的后处理还原)
# =========================
def predict(
model,
tokenizer,
image,
caption,
text_cache,
box_threshold,
text_threshold,
remove_combined=False,
is_benchmark=False
):
# 提前针对car .生成对应输入
) -> Tuple[np.ndarray, np.ndarray, List[str]]:
# 使用传入的 text_cache 替代硬编码
input_dict = {
"img": np.expand_dims(np.asarray(image), axis=0).astype(np.float32),
"position_ids": np.array([[0, 0, 1, 0]], dtype=np.int64),
"input_ids": np.array([[101, 2482, 1012, 102]], dtype=np.int64),
"token_type_ids": np.array([[0, 0, 0, 0]], dtype=np.int64),
"text_token_mask": np.array([[
[True, False, False, False],
[False, True, True, False],
[False, True, True, False],
[False, False, False, True]
]], dtype=np.bool_),
"attention_mask": np.array([[True, True, True, True]], dtype=np.bool_)
"input_ids": text_cache['input_ids'],
"attention_mask": text_cache['attention_mask'],
"position_ids": text_cache['position_ids'],
"token_type_ids": text_cache['token_type_ids'],
"text_token_mask": text_cache['text_token_mask']
}
outputs, infer_time = model.infer(input_dict)
if not is_benchmark:
print(f"Inference time: {infer_time*1000:.2f} ms")
print(f"Inference time: {infer_time:.3f}s")
logits = sigmoid(outputs[0][0])
boxes = outputs[1][0]
t0 = time.time()
prediction_logits = sigmoid(outputs[0][0])
prediction_boxes = outputs[1][0]
post_time = time.time() - t0
max_values = np.max(logits, axis=1)
if not is_benchmark:
print(f"post time: {post_time:.3f}s")
print(f"\n=== Debug Info ===")
print(f"Prediction logits shape: {prediction_logits.shape}")
print(f"Prediction boxes shape: {prediction_boxes.shape}")
print(f"Max logit value: {np.max(prediction_logits):.4f}")
print(f"Mean logit value: {np.mean(prediction_logits):.4f}")
# 1. 框过滤
max_values = np.max(prediction_logits, axis=1)
mask = max_values > box_threshold
logits = logits[mask]
boxes = boxes[mask]
phrases = ["object"] * len(boxes)
logits = prediction_logits[mask]
boxes = prediction_boxes[mask]
tokens = text_cache['tokens']
input_ids = text_cache['input_ids'][0].tolist()
if remove_combined:
sep_idx = [i for i in range(len(input_ids)) if input_ids[i] in [101, 102, 1012]]
phrases = []
for logit in logits:
max_idx = logit.argmax()
insert_idx = bisect.bisect_left(sep_idx, max_idx)
right_idx = sep_idx[insert_idx]
left_idx = sep_idx[insert_idx - 1]
phrases.append(
get_phrases_from_posmap(logit > text_threshold, tokens, left_idx, right_idx)
)
else:
phrases = [
get_phrases_from_posmap(logit > text_threshold, tokens)
for logit in logits
]
return boxes, np.max(logits, axis=1), phrases
......@@ -207,20 +212,62 @@ def predict(
# =========================
# Benchmark
# =========================
def benchmark(model, tokenizer, image, caption, box_th, text_th, warmup=5, runs=10):
print("\n🔥 预热")
for _ in range(warmup):
predict(model, tokenizer, image, caption, box_th, text_th, True)
print("\n🚀 测试")
times = []
for i in range(runs):
start = time.time()
predict(model, tokenizer, image, caption, box_th, text_th, True)
times.append(time.time() - start)
print(f"\n平均耗时: {np.mean(times)*1000:.2f} ms")
print(f"FPS: {1/np.mean(times):.2f}")
def benchmark_performance(
model, image, text_cache, box_threshold, text_threshold,
warmup_runs=5, test_runs=10
):
print("="*60)
print("📊 开始性能测试(包含预热+实际推理)")
print("="*60)
print(f"\n🔥 预热阶段({warmup_runs} 次)- 不计入性能统计")
warmup_start = time.time()
for i in range(warmup_runs):
t0 = time.time()
predict(model, image, text_cache, box_threshold, text_threshold, is_benchmark=True)
warmup_time = time.time() - t0
print(f"预热 {i+1}/{warmup_runs} - 耗时: {warmup_time*1000:.2f} ms")
total_warmup_time = time.time() - warmup_start
print(f"\n预热完成 - 总耗时: {total_warmup_time:.3f} s, 平均每次: {total_warmup_time/warmup_runs*1000:.2f} ms")
print(f"\n🚀 实际推理测试阶段({test_runs} 次)- 统计性能指标")
test_start = time.time()
infer_times = []
for i in range(test_runs):
t0 = time.time()
predict(model, image, text_cache, box_threshold, text_threshold, is_benchmark=True)
infer_time = time.time() - t0
infer_times.append(infer_time)
print(f"实际推理 {i+1}/{test_runs} - 耗时: {infer_time*1000:.2f} ms")
total_test_time = time.time() - test_start
avg_infer_time = np.mean(infer_times)
std_infer_time = np.std(infer_times)
max_infer_time = np.max(infer_times)
min_infer_time = np.min(infer_times)
fps = test_runs / total_test_time
print("\n" + "="*60)
print("📈 性能测试报告(仅实际推理阶段)")
print("="*60)
print(f"测试次数: {test_runs} 次")
print(f"总推理耗时: {total_test_time:.3f} s")
print(f"平均推理耗时: {avg_infer_time*1000:.2f} ms (±{std_infer_time*1000:.2f} ms)")
print(f"最大推理耗时: {max_infer_time*1000:.2f} ms")
print(f"最小推理耗时: {min_infer_time*1000:.2f} ms")
print(f"平均FPS: {fps:.2f} 帧/秒")
print("="*60)
return {
"warmup_runs": warmup_runs,
"test_runs": test_runs,
"avg_infer_time_ms": avg_infer_time*1000,
"std_infer_time_ms": std_infer_time*1000,
"max_infer_time_ms": max_infer_time*1000,
"min_infer_time_ms": min_infer_time*1000,
"fps": fps
}
# =========================
......@@ -228,31 +275,84 @@ def benchmark(model, tokenizer, image, caption, box_th, text_th, warmup=5, runs=
# =========================
if __name__ == "__main__":
model_path = "../weights/ground_opt.onnx"
cache_path = "../weights/ground_opt.mxr" # ⭐ 缓存文件
model_path = "../weights/ground_opt_0430.onnx"
cache_path = "../weights/ground_opt_0430.mxr"
img_path = "../images/in/car_1.jpg"
TEXT_PROMPT = "car ."
BOX_TRESHOLD = 0.35
TEXT_TRESHOLD = 0.25
WARMUP_RUNS = 5
TEST_RUNS = 10
# 🚀 加载模型(自动缓存)
model = MIGraphXModel(
model_path,
cache_path=cache_path,
force_recompile=False # 改成 True 可强制重编译
device_id=5,
force_recompile=False
)
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
image_source, image = load_image(img_path)
benchmark(model, tokenizer, image, TEXT_PROMPT, BOX_TRESHOLD, TEXT_TRESHOLD)
# =========================
# 提前计算得到的 Text Cache
# =========================
TEXT_CACHE = {
'input_ids': np.array([[ 101, 2482, 1012, 102]], dtype=np.int64),
'attention_mask': np.array([[ True, True, True, True]], dtype=np.bool_),
'position_ids': np.array([[0, 0, 1, 0]], dtype=np.int64),
'token_type_ids': np.array([[0, 0, 0, 0]], dtype=np.int64),
'text_token_mask': np.array([[[ True, False, False, False],
[False, True, True, False],
[False, True, True, False],
[False, False, False, True]]], dtype=np.bool_),
# 存放 ID 对应的单词,用于快速 decode
'tokens': ["[CLS]", "car", ".", "[SEP]"]
}
benchmark_performance(
model, image, TEXT_CACHE,
BOX_TRESHOLD, TEXT_TRESHOLD,
WARMUP_RUNS, TEST_RUNS
)
print("\n" + "="*60)
print("🎯 执行最终推理(带详细日志+保存结果)")
print("="*60)
# 传入 TEXT_CACHE
boxes, confs, phrases = predict(
model, tokenizer, image,
TEXT_PROMPT, BOX_TRESHOLD, TEXT_TRESHOLD
model, image, TEXT_CACHE,
BOX_TRESHOLD, TEXT_TRESHOLD
)
print("检测结果:", phrases)
\ No newline at end of file
print("\n🎯 执行最终推理并保存结果图")
ori_img = cv2.imread(img_path)
img_h = ori_img.shape[0]
img_w = ori_img.shape[1]
for i in range(len(boxes)):
one_box = boxes[i]
one_conf = confs[i]
one_cls = phrases[i]
x1 = int((one_box[0] - one_box[2] / 2) * img_w)
y1 = int((one_box[1] - one_box[3] / 2) * img_h)
x2 = int((one_box[0] + one_box[2] / 2) * img_w)
y2 = int((one_box[1] + one_box[3] / 2) * img_h)
cv2.rectangle(ori_img, (x1, y1), (x2, y2), (0, 0, 255), 2)
# 此时打印的 one_cls 将是真实的类别名称(如 "car")
cv2.putText(
ori_img, f'{one_cls} {one_conf:.2f}',
(x1-15, y1-15),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
color=(255, 255, 255),
fontScale=1.5,
thickness=3
)
cv2.imwrite('../weights/result_migraphx.jpg', ori_img)
print(f"\n✅ 结果已保存至: ../weights/result_migraphx.jpg")
print(f"✅ 检测到目标: {phrases} (共 {len(boxes)} 个)")
\ No newline at end of file
import cv2
import numpy as np
import torch
import time
import os
import migraphx
from typing import Tuple
import torch
import groundingdino.datasets.transforms as T
from PIL import Image
"""
使用cpu数据做推理
"""
def load_image(image_path: str) -> Tuple[np.array, torch.Tensor]:
transform = T.Compose(
[
......@@ -25,7 +29,43 @@ def load_image(image_path: str) -> Tuple[np.array, torch.Tensor]:
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def get_phrases_from_posmap(
posmap: np.ndarray, tokens: List[str], left_idx: int = 0, right_idx: int = 255
):
"""
【核心优化】直接用字符串列表映射,抛弃沉重的 Tokenizer
"""
assert isinstance(posmap, np.ndarray), "posmap must be np.ndarray"
if posmap.ndim == 1:
# 将指定范围内的元素设为 False
posmap[:left_idx + 1] = False
posmap[right_idx:] = False
# 获取非零元素的索引
non_zero_idx = np.nonzero(posmap)[0]
# 提取被激活的单词,并自动过滤掉特殊占位符
words = [tokens[i] for i in non_zero_idx if tokens[i] not in ["[CLS]", "[SEP]", "."]]
return " ".join(words).strip()
else:
raise NotImplementedError("posmap must be 1-dim")
def preprocess_caption(caption: str) -> str:
result = caption.lower().strip()
if result.endswith("."):
return result
return result + "."
def to_mgx(x):
if x.dtype == np.int64:
return migraphx.argument(x.astype(np.int64))
elif x.dtype == np.bool_:
return migraphx.argument(x.astype(np.bool_))
else:
return migraphx.argument(x.astype(np.float32))
def _mgx_shape_to_numpy(shape):
# 将 migraphx input shape 映射到 numpy dtype + lens 以生成零填充张量
shape_str = str(shape)
if "int64_type" in shape_str:
dtype = np.int64
......@@ -43,200 +83,304 @@ def _mgx_shape_to_numpy(shape):
lens = list(shape.lens())
except Exception:
lens = []
# 优先用 dims,dims 为空时才退化到 lens
return dtype, (dims if len(dims) > 0 else lens)
# =========================
# 🚀 MIGraphX 推理类(带缓存与生命周期管理
# 🚀 MIGraphX 推理类(带缓存)
# =========================
class MIGraphXModel:
def __init__(self, onnx_path, cache_path="weights/ground_opt.mxr", force_recompile=False, device_id=0):
def __init__(self, onnx_path, cache_path="../weights/ground_opt_0506.mxr", force_recompile=False):
self.cache_path = cache_path
# ====== 优先加载缓存 ======
if os.path.exists(cache_path) and not force_recompile:
print(f"⚡ 直接加载已编译模型: {cache_path}")
self.model = migraphx.load(cache_path)
else:
print("🔍 从 ONNX 构建 MIGraphX")
self.model = migraphx.parse_onnx(onnx_path)
# print(self.model)
# ====================== 2. 打印模型输入输出信息 ======================
print("=== 模型输入信息 ===")
inputs = self.model.get_inputs()
for key, value in inputs.items():
print(f"{key}: {value}")
print(f"⚙️ 编译 MIGraphX(GPU {device_id})")
self.model.compile(t=migraphx.get_target("gpu"), device_id=device_id)
print("\n=== 模型输出信息 ===")
outputs = self.model.get_outputs()
for key, value in outputs.items():
print(f"{key}: {value}")
print("⚙️ 编译 MIGraphX(GPU)")
self.model.compile(
t=migraphx.get_target("gpu"), device_id=3, offload_copy=True
)
# ====== 保存缓存 ======
print(f"💾 保存编译模型到: {cache_path}")
migraphx.save(self.model, cache_path)
self.param_names = self.model.get_parameter_names()
self.input_shapes = self.model.get_inputs()
print("✅ param_names:", self.param_names)
print("✅ input_shape:", self.input_shapes)
try:
self.output_shapes = self.model.get_outputs()
print("✅ output_shapes keys:", list(self.output_shapes.keys()))
except Exception:
self.output_shapes = None
def infer(self, input_dict):
# 只按模型 get_inputs() 定义的输入签名来组装
mgx_inputs = {}
# 【关键修复区】:用于保持 NumPy 数组存活,防止 Python 垃圾回收导致底层指针失效
self._keep_alive_cache = {}
provided_names = set(input_dict.keys())
# 某些 mxr 会把内部输出别名也暴露到 get_parameter_names/get_inputs 里,
# 这里显式排除 main:#output_*,避免把内部输出当成输入填充。
required_names = {
k for k in self.input_shapes.keys()
if not str(k).startswith("main:#output")
}
for name in required_names:
shape = self.input_shapes[name]
target_dtype, lens = _mgx_shape_to_numpy(shape)
if name in provided_names:
# 1. 必须转为连续内存!防止 PyTorch 转过来的 array 内存步长不一致
arr = np.ascontiguousarray(input_dict[name])
# 2. 强制类型转换
if arr.dtype != target_dtype:
arr = arr.astype(target_dtype)
else:
# 缺失的输入用 0 补齐
arr = np.zeros(lens, dtype=target_dtype)
# 3. 将数组塞进字典,强行续命!
self._keep_alive_cache[name] = arr
# 4. 安全地将指针移交给 migraphx
mgx_inputs[name] = migraphx.argument(arr)
missing = required_names - provided_names
if missing:
print("⚠️ 缺失模型输入,准备按 shape 自动补齐:")
for name in sorted(missing):
shape = self.input_shapes[name]
dtype, lens = _mgx_shape_to_numpy(shape)
mgx_inputs[name] = to_mgx(np.zeros(lens, dtype=dtype))
print(f" - {name}: shape={lens}, dtype={dtype.__name__}")
for name in (required_names & provided_names):
mgx_inputs[name] = to_mgx(input_dict[name])
# 额外的 key 不喂给模型,避免和内部签名冲突
extra = provided_names - required_names
if extra:
print("ℹ️ 有多余输入参数将被忽略:")
for name in sorted(extra):
print(f" - {name}")
start = time.time()
result = self.model.run(mgx_inputs)
infer_time = time.time() - start
outputs = [np.array(r) for r in result]
# 推理结束,释放内存
self._keep_alive_cache.clear()
return outputs, infer_time
# =========================
# 推理函数 (硬编码输入,无 Tokenizer)
# 推理函数
# =========================
def predict(model, image, box_threshold, is_benchmark=False):
def predict(
model,
image,
caption,
box_threshold,
text_threshold,
is_benchmark=False
):
# 提前针对car .生成对应输入
input_dict = {
"img": np.expand_dims(np.asarray(image), axis=0),
"position_ids": np.array([[0, 0, 1, 0]]),
"input_ids": np.array([[101, 2482, 1012, 102]]),
"token_type_ids": np.array([[0, 0, 0, 0]]),
"img": np.expand_dims(np.asarray(image), axis=0).astype(np.float32),
"position_ids": np.array([[0, 0, 1, 0]], dtype=np.int64),
"input_ids": np.array([[101, 2482, 1012, 102]], dtype=np.int64),
"token_type_ids": np.array([[0, 0, 0, 0]], dtype=np.int64),
"text_token_mask": np.array([[
[True, False, False, False],
[False, True, True, False],
[False, True, True, False],
[False, False, False, True]
]]),
"attention_mask": np.array([[True, True, True, True]])
]], dtype=np.bool_),
"attention_mask": np.array([[True, True, True, True]], dtype=np.bool_)
}
outputs, infer_time = model.infer(input_dict)
if not is_benchmark:
print(f"Inference time: {infer_time*1000:.2f} ms")
print(f"Inference time: {infer_time:.3f}s")
logits = sigmoid(outputs[0][0])
boxes = outputs[1][0]
t0 = time.time()
prediction_logits = sigmoid(outputs[0][0])
prediction_boxes = outputs[1][0]
post_time = time.time() - t0
max_values = np.max(logits, axis=1)
if not is_benchmark:
print(f"post time: {post_time:.3f}s")
print(f"\n=== Debug Info ===")
print(f"Prediction logits shape: {prediction_logits.shape}")
print(f"Prediction boxes shape: {prediction_boxes.shape}")
print(f"Max logit value: {np.max(prediction_logits):.4f}")
print(f"Mean logit value: {np.mean(prediction_logits):.4f}")
max_values = np.max(prediction_logits, axis=1)
mask = max_values > box_threshold
logits = logits[mask]
boxes = boxes[mask]
phrases = ["car"] * len(boxes)
logits = prediction_logits[mask]
boxes = prediction_boxes[mask]
tokens = text_cache['tokens']
input_ids = text_cache['input_ids'][0].tolist()
if remove_combined:
sep_idx = [i for i in range(len(input_ids)) if input_ids[i] in [101, 102, 1012]]
phrases = []
for logit in logits:
max_idx = logit.argmax()
insert_idx = bisect.bisect_left(sep_idx, max_idx)
right_idx = sep_idx[insert_idx]
left_idx = sep_idx[insert_idx - 1]
phrases.append(
get_phrases_from_posmap(logit > text_threshold, tokens, left_idx, right_idx)
)
else:
phrases = [
get_phrases_from_posmap(logit > text_threshold, tokens)
for logit in logits
]
return boxes, np.max(logits, axis=1), phrases
# =========================
# Benchmark
# Benchmark (完全移植 ORT 格式)
# =========================
def benchmark(model, image, box_th, warmup=5, runs=10):
print("\n🔥 预热")
for _ in range(warmup):
predict(model, image, box_th, True)
print("\n🚀 测试")
times = []
for i in range(runs):
start = time.time()
predict(model, image, box_th, True)
times.append(time.time() - start)
def benchmark_performance(
model, image, caption, box_threshold, text_threshold,
warmup_runs=5, test_runs=10
):
"""
性能测试函数:包含预热和实际推理
"""
print("="*60)
print("📊 开始性能测试(包含预热+实际推理)")
print("="*60)
print(f"\n🔥 预热阶段({warmup_runs} 次)- 不计入性能统计")
warmup_start = time.time()
for i in range(warmup_runs):
t0 = time.time()
predict(model, image, caption, box_threshold, text_threshold, is_benchmark=True)
warmup_time = time.time() - t0
print(f"预热 {i+1}/{warmup_runs} - 耗时: {warmup_time*1000:.2f} ms")
total_warmup_time = time.time() - warmup_start
print(f"\n预热完成 - 总耗时: {total_warmup_time:.3f} s, 平均每次: {total_warmup_time/warmup_runs*1000:.2f} ms")
print(f"\n🚀 实际推理测试阶段({test_runs} 次)- 统计性能指标")
test_start = time.time()
infer_times = []
for i in range(test_runs):
t0 = time.time()
predict(model, image, caption, box_threshold, text_threshold, is_benchmark=True)
infer_time = time.time() - t0
infer_times.append(infer_time)
print(f"实际推理 {i+1}/{test_runs} - 耗时: {infer_time*1000:.2f} ms")
# 计算性能指标
total_test_time = time.time() - test_start
avg_infer_time = np.mean(infer_times)
std_infer_time = np.std(infer_times)
max_infer_time = np.max(infer_times)
min_infer_time = np.min(infer_times)
fps = test_runs / total_test_time
# 输出性能报告
print("\n" + "="*60)
print("📈 性能测试报告(仅实际推理阶段)")
print("="*60)
print(f"测试次数: {test_runs} 次")
print(f"总推理耗时: {total_test_time:.3f} s")
print(f"平均推理耗时: {avg_infer_time*1000:.2f} ms (±{std_infer_time*1000:.2f} ms)")
print(f"最大推理耗时: {max_infer_time*1000:.2f} ms")
print(f"最小推理耗时: {min_infer_time*1000:.2f} ms")
print(f"平均FPS: {fps:.2f} 帧/秒")
print("="*60)
print(f"\n平均耗时: {np.mean(times)*1000:.2f} ms")
print(f"FPS: {1/np.mean(times):.2f}")
return {
"warmup_runs": warmup_runs,
"test_runs": test_runs,
"avg_infer_time_ms": avg_infer_time*1000,
"std_infer_time_ms": std_infer_time*1000,
"max_infer_time_ms": max_infer_time*1000,
"min_infer_time_ms": min_infer_time*1000,
"fps": fps
}
# =========================
# 主函数
# =========================
# if __name__ == "__main__":
# model_path = "../weights/ground_opt.onnx"
# cache_path = "../weights/ground_opt.mxr"
# img_path = "../images/in/car_1.jpg"
# BOX_TRESHOLD = 0.35
# DEVICE_ID = 5 # 匹配你之前报错堆栈里的 device: 5 / 0 的情况,按需修改
if __name__ == "__main__":
# model = MIGraphXModel(
# model_path,
# cache_path=cache_path,
# force_recompile=False,
# device_id=DEVICE_ID
# )
model_path = "../weights/ground_opt_0430.onnx"
cache_path = "../weights/ground_opt_0506.mxr" # ⭐ 缓存文件
# image_source, image = load_image(img_path)
img_path = "../images/in/car_1.jpg"
# benchmark(model, image, BOX_TRESHOLD)
TEXT_PROMPT = "car ."
BOX_TRESHOLD = 0.35
TEXT_TRESHOLD = 0.25
WARMUP_RUNS = 5
TEST_RUNS = 10
# 🚀 加载模型(自动缓存)
model = MIGraphXModel(
model_path,
cache_path=cache_path,
force_recompile=False # 改成 True 可强制重编译
)
# boxes, confs, phrases = predict(model, image, BOX_TRESHOLD)
image_source, image = load_image(img_path)
# print("检测结果:", phrases)
# 第一步:运行完整的性能测试(预热+实际推理)
benchmark_performance(
model, image, TEXT_PROMPT,
BOX_TRESHOLD, TEXT_TRESHOLD,
WARMUP_RUNS, TEST_RUNS
)
def test_like_perf(model):
# 第二步:执行最终推理并画图保存
print("\n" + "="*60)
print("🛠️ 模拟 perf 工具:生成完美对齐的 Dummy 数据测试")
print("🎯 执行最终推理(带详细日志+保存结果)")
print("="*60)
mgx_inputs = {}
keep_alive_cache = [] # 强行续命池
boxes, confs, phrases = predict(
model, image,
TEXT_PROMPT, BOX_TRESHOLD, TEXT_TRESHOLD
)
# 绘制并保存结果图片
print("\n🎯 执行最终推理并保存结果图")
ori_img = cv2.imread(img_path)
img_h = ori_img.shape[0]
img_w = ori_img.shape[1]
# 1. 严格按照模型要求的形状造假数据
for name, shape in model.get_inputs().items():
if str(name).startswith("main:#output"):
continue
# 解析真实需要的类型和形状
target_dtype, lens = _mgx_shape_to_numpy(shape)
print(f" 📦 分配 {name}: shape={lens}, dtype={target_dtype.__name__}")
# 生成分毫不差的全零矩阵(完美模拟 migraphx-driver)
dummy_data = np.zeros(lens, dtype=target_dtype)
keep_alive_cache.append(dummy_data)
for i in range(len(boxes)):
one_box = boxes[i]
one_conf = confs[i]
one_cls = phrases[i]
# 移交指针
mgx_inputs[name] = migraphx.argument(dummy_data)
x1 = int((one_box[0] - one_box[2] / 2) * img_w)
y1 = int((one_box[1] - one_box[3] / 2) * img_h)
x2 = int((one_box[0] + one_box[2] / 2) * img_w)
y2 = int((one_box[1] + one_box[3] / 2) * img_h)
print("\n🚀 开始 Dummy 推理测试...")
try:
start = time.time()
model.run(mgx_inputs)
print(f"✅ Python 端 Dummy 推理成功!没有任何 VMFault!耗时: {(time.time()-start)*1000:.2f}ms")
except Exception as e:
print(f"❌ 依然报错: {e}")
# ------------------
# 在主函数里这样调用:
# ------------------
if __name__ == "__main__":
model_path = "../weights/ground_opt.onnx"
cache_path = "../weights/ground_opt.mxr"
model = migraphx.load(cache_path) # 直接加载你确定没问题的 mxr
# 运行模拟测试
test_like_perf(model)
\ No newline at end of file
cv2.rectangle(ori_img, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(
ori_img, f'{one_cls} {one_conf:.2f}',
(x1-15, y1-15),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
color=(255, 255, 255),
fontScale=1.5,
thickness=3
)
# 保存结果
cv2.imwrite('../weights/result_migraphx.jpg', ori_img)
print(f"\n✅ 结果已保存至: ../weights/result_migraphx.jpg")
print(f"✅ 检测到目标: {phrases} (共 {len(boxes)} 个)")
\ No newline at end of file
import cv2
import numpy as np
import migraphx
"""
本示例演示了如何使用migraphx进行推理,主要步骤如下:
1. 加载模型
2. 获取模型输入输出节点信息
3. 编译模型
4. 为输出节点分配device内存,用于保存输出数据
5. 预处理并转换为NCHW
6. 将输入数据转换为device数据作为输入数据
7. 推理
"""
def ReadImage(pathOfImage,inputShape):
srcImage = cv2.imread(pathOfImage, cv2.IMREAD_COLOR)
# resize并转换为CHW
resizedImage = cv2.resize(srcImage,(inputShape[3], inputShape[2]))
resizedImage_Float = resizedImage.astype("float32") # 转换为float32
srcImage_CHW = np.transpose(resizedImage_Float, (2, 0, 1)) # 转换为CHW
# 预处理
mean = np.array([127.5, 127.5, 127.5])
scale = np.array([0.0078125, 0.0078125, 0.0078125])
inputData = np.zeros(inputShape).astype("float32") # NCHW
for i in range(srcImage_CHW.shape[0]):
inputData[0,i, :, :] = (srcImage_CHW[i, :, :] - mean[i]) * scale[i]
for i in range(inputData.shape[0]):
if i!=0:
inputData[i,:, :, :]=inputData[0,:, :, :]
return inputData
def AllocateOutputMemory(model):
outputData={}
for key in model.get_outputs().keys():
outputData[key] = migraphx.allocate_gpu(s=model.get_outputs()[key])
return outputData
if __name__ == '__main__':
# 加载模型
model = migraphx.parse_onnx("ResNet50.onnx")
# 获取模型输入输出节点信息
print("inputs:")
inputs=model.get_inputs()
for key,value in inputs.items():
print("{}:{}".format(key,value))
print("outputs:")
outputs=model.get_outputs()
for key,value in outputs.items():
print("{}:{}".format(key,value))
inputName=list(model.get_inputs().keys())[0]
inputShape=inputs[inputName].lens()
# 编译
model.compile(t=migraphx.get_target("gpu"),offload_copy=False,device_id=0)
# 为输出节点分配device内存,用于保存输出数据
modelData=AllocateOutputMemory(model)
# 预处理并转换为NCHW
pathOfImage ="Test.jpg"
image = ReadImage(pathOfImage,inputShape)
# 将输入数据转换为device数据作为输入数据
modelData[inputName]=migraphx.to_gpu(migraphx.argument(image))
# 推理
results = model.run(modelData)
# 获取输出节点属性
result=migraphx.from_gpu(results[0]) # 将第一个输出节点的数据拷贝到host端,migraphx.argument类型
outputShape=result.get_shape() # 输出节点的shape,migraphx.shape类型
outputSize=outputShape.lens() # 每一维大小,维度顺序为(N,C,H,W),list类型
numberOfOutput=outputShape.elements() # 输出节点元素的个数
# 转换为numpy
result = np.array(result)
print(result)
migraphx-driver perf --batch 1 \
-n 10 \
--fp16 \
--migraphx ../weights/ground_opt.mxr
\ No newline at end of file
--migraphx ../weights/ground_opt_0430.mxr
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment