Commit 34e4011b authored by zk's avatar zk
Browse files

首次提交

parents
Pipeline #3503 failed with stages
in 0 seconds
import onnx
from onnx import helper
INPUT_MODEL = "weights/ground_simplified.onnx"
OUTPUT_MODEL = "weights/ground_fix.onnx"
def add_identity(graph, input_name, suffix, new_nodes, processed):
if input_name in processed:
return input_name + suffix
new_name = input_name + suffix
identity_node = helper.make_node(
"Identity",
inputs=[input_name],
outputs=[new_name],
name=input_name + suffix + "_identity"
)
new_nodes.append(identity_node)
processed.add(input_name)
return new_name
def patch_model(model):
graph = model.graph
new_nodes = []
processed = set()
for node in graph.node:
# ✅ 1. 处理 Gather(你之前做的)
if node.op_type == "Gather":
idx = node.input[1]
node.input[1] = add_identity(graph, idx, "_block", new_nodes, processed)
# ✅ 2. 🔥 关键:处理 ScatterND
if node.op_type.lower().startswith("scatter"):
# scatternd(data, indices, updates)
data = node.input[0]
indices = node.input[1]
updates = node.input[2]
node.input[0] = add_identity(graph, data, "_block", new_nodes, processed)
node.input[1] = add_identity(graph, indices, "_block", new_nodes, processed)
node.input[2] = add_identity(graph, updates, "_block", new_nodes, processed)
# ✅ 3. where(也可能触发 constant folding)
if node.op_type == "Where":
for i in range(3):
node.input[i] = add_identity(graph, node.input[i], "_block", new_nodes, processed)
# 插入到最前面
for i, n in enumerate(new_nodes):
graph.node.insert(i, n)
return model
def main():
print("🔍 加载模型...")
model = onnx.load(INPUT_MODEL)
print("⚙️ 全面阻断 constant folding(Gather + ScatterND + Where)...")
model = patch_model(model)
print("💾 保存模型...")
onnx.save(model, OUTPUT_MODEL)
print("✅ 完成:", OUTPUT_MODEL)
if __name__ == "__main__":
main()
\ No newline at end of file
import onnx
from onnx import numpy_helper
model = onnx.load("weights/ground.onnx")
for init in model.graph.initializer:
if "Constant" in init.name:
arr = numpy_helper.to_array(init)
if arr.dtype in [np.int32, np.int64]:
if (arr < 0).any() or (arr > 10000).any():
print("🚨 可疑 index:", init.name, arr)
\ No newline at end of file
import onnx
import numpy as np
from onnx import numpy_helper
model = onnx.load("weights/ground.onnx")
# 找所有 initializer
init_map = {i.name: numpy_helper.to_array(i) for i in model.graph.initializer}
for node in model.graph.node:
if node.op_type == "Gather":
index_name = node.input[1]
if index_name in init_map:
idx = init_map[index_name]
print("\n🚨 Gather index:", index_name)
print("dtype:", idx.dtype)
print("min:", idx.min())
print("max:", idx.max())
print("shape:", idx.shape)
if (idx < 0).any():
print("❌ NEGATIVE index")
if (idx > 10000).any():
print("❌ SUSPICIOUS LARGE index")
\ No newline at end of file
from typing import Tuple, List, Dict
import cv2
import numpy as np
import torch
import onnxruntime as ort
from transformers import BertTokenizer, AutoTokenizer
import bisect
import time
from groundingdino.util.inference import load_image
from groundingdino.models.GroundingDINO.bertwarper import generate_masks_with_special_tokens_and_transfer_map
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def get_phrases_from_posmap(
posmap: np.ndarray, tokenized: Dict, tokenizer: AutoTokenizer, left_idx: int = 0, right_idx: int = 255
):
assert isinstance(posmap, np.ndarray), "posmap must be np.ndarray"
if posmap.ndim == 1:
# 将指定范围内的元素设为 False
posmap[:left_idx + 1] = False
posmap[right_idx:] = False
# 获取非零元素的索引
non_zero_idx = np.nonzero(posmap)[0]
token_ids = [tokenized["input_ids"][i] for i in non_zero_idx]
return tokenizer.decode(token_ids)
else:
raise NotImplementedError("posmap must be 1-dim")
def preprocess_caption(caption: str) -> str:
result = caption.lower().strip()
if result.endswith("."):
return result
return result + "."
def predict(
ort_session,
# ort_session_gpu,
# ort_session_cpu,
image: np.array,
caption: str,
box_threshold: float,
text_threshold: float,
device: str = "cpu",
remove_combined: bool = False
) -> Tuple[torch.Tensor, torch.Tensor, List[str]]:
# 1. 文本预处理
t0 = time.time()
caption = preprocess_caption(caption=caption) # 对输入的 caption 进行预处理,去除多余的空格或无效字符
print(f"Caption processing took {(time.time() - t0):.3f}s")
# # 2. 模型与数据加载到设备
# model = model.to(device)
# image = image.to(device)
captions = [caption]
# 3. 编码文本
# 使用模型的 tokenizer 对 caption 进行分词,并将其转换为张量格式
t0 = time.time()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
print(f"Loaded BERT tokenizer took {(time.time() - t0):.3f}s")
t0 = time.time()
tokenized = tokenizer(captions, padding="longest", return_tensors="pt").to(device) # padding="longest" 确保在批处理中对齐较短的句子
specical_tokens = tokenizer.convert_tokens_to_ids (["[CLS]", "[SEP]", ".", "?"]) # 将特殊字符(如 [CLS]、[SEP] 等)转换为它们在词汇表中的对应 ID
print(f"Word embedding took {(time.time() - t0):.3f}s")
# 4. 生成注意力掩码和位置信息
# 生成自注意力掩码,位置信息和类别到 token 的映射。这些掩码用于在 Transformer 中对注意力进行控制
t0 = time.time()
(
text_self_attention_masks,
position_ids,
cate_to_token_mask_list,
) = generate_masks_with_special_tokens_and_transfer_map(
tokenized, specical_tokens, tokenizer)
print(f"Generate attention masks took {(time.time() - t0):.3f}s")
# 5. 处理超长文本
max_text_len = 256
# 如果 caption 的长度超过模型的最大长度 max_text_len,则进行裁剪处理,包括裁剪输入 ID、注意力掩码和 token 类型 ID
if text_self_attention_masks.shape[1] > max_text_len:
text_self_attention_masks = text_self_attention_masks[
:, : max_text_len, : max_text_len]
position_ids = position_ids[:, : max_text_len]
tokenized["input_ids"] = tokenized["input_ids"][:, : max_text_len]
tokenized["attention_mask"] = tokenized["attention_mask"][:, : max_text_len]
tokenized["token_type_ids"] = tokenized["token_type_ids"][:, : max_text_len]
# 6. 执行模型推理
attention_mask = np.asarray(tokenized["attention_mask"]).astype(bool)
input_dict = {"img": np.expand_dims(np.asarray(image), axis=0),"input_ids": np.asarray(tokenized["input_ids"]), "attention_mask": attention_mask,
"position_ids": np.asarray(position_ids), "token_type_ids": np.asarray(tokenized["token_type_ids"]), "text_token_mask": np.asarray(text_self_attention_masks)}
"""
(Pdb) input_dict["img"].shape
(1, 3, 800, 1200)
(Pdb) input_dict["input_ids"].shape
(1, 4)
(Pdb) input_dict["position_ids"].shape
(1, 4)
(Pdb) input_dict["token_type_ids"].shape
(1, 4)
(Pdb) input_dict["attention_mask"].shape
(1, 4)
(Pdb) input_dict["text_token_mask"].shape
(1, 4, 4)
"""
t0 = time.time()
outputs = ort_session.run(['logits', 'boxes'], input_dict)
# import pdb;pdb.set_trace()
print(f"Inference time: {(time.time() - t0):.3f}s")
# 7. 获取预测结果
prediction_logits = np.apply_along_axis(sigmoid, -1, outputs[0][0])
# prediction_logits = outputs[0].sigmoid()[0] # prediction_logits.shape = (nq, 256)
prediction_boxes = outputs[1][0] # prediction_boxes.shape = (nq, 4)
print(f"\n=== Debug Info ===")
print(f"Prediction logits shape: {prediction_logits.shape}")
print(f"Prediction boxes shape: {prediction_boxes.shape}")
print(f"Max logit value: {np.max(prediction_logits):.4f}")
print(f"Mean logit value: {np.mean(prediction_logits):.4f}")
# 8. 应用过滤条件
# 获取每一行的最大值
max_values = np.max(prediction_logits, axis=1)
# 与阈值比较
mask = max_values > box_threshold
# mask = prediction_logits.max(dim=1)[0] > box_threshold
logits = prediction_logits[mask] # logits.shape = (n, 256)
boxes = prediction_boxes[mask] # boxes.shape = (n, 4)
# 9. 处理文本匹配
# tokenizer = model.tokenizer
tokenized = tokenizer(caption)
# 10. 处理特殊标记
# 如果 remove_combined 为 True,则根据 [SEP] 等特殊标记对文本进行分段处理,否则直接从预测的文本概率图中提取匹配的短语
# get_phrases_from_posmap: 根据匹配的概率图从文本中提取短语
if remove_combined:
sep_idx = [i for i in range(len(tokenized['input_ids'])) if tokenized['input_ids'][i] in [101, 102, 1012]]
phrases = []
for logit in logits:
max_idx = logit.argmax()
insert_idx = bisect.bisect_left(sep_idx, max_idx)
right_idx = sep_idx[insert_idx]
left_idx = sep_idx[insert_idx - 1]
phrases.append \
(get_phrases_from_posmap(logit > text_threshold, tokenized, tokenizer, left_idx, right_idx).replace('.', ''))
else:
phrases = [
get_phrases_from_posmap(logit > text_threshold, tokenized, tokenizer).replace('.', '')
for logit
in logits
]
return boxes, np.max(logits, axis=1), phrases
if __name__ == '__main__':
model_path = 'weights/ground.onnx'
img_path = 'images/in/car_1.jpg'
TEXT_PROMPT = "car ."
BOX_TRESHOLD = 0.35
TEXT_TRESHOLD = 0.25
image_source, image = load_image(img_path)
# 加载 ONNX 模型,创建 InferenceSession
print("Loading ONNX model")
ort_session = ort.InferenceSession(model_path, providers=['ROCMExecutionProvider', 'CPUExecutionProvider'])
# 查看当前正在使用的 ExecutionProvider (第一个 provider)
current_provider = ort_session.get_providers()[0]
print("Loaded ONNX model, Current Execution Provider:", current_provider)
boxes, confs, phrases = predict(ort_session, image, TEXT_PROMPT, BOX_TRESHOLD, TEXT_TRESHOLD)
ori_img = cv2.imread(img_path)
img_h = ori_img.shape[0]
img_w = ori_img.shape[1]
for i in range(len(boxes)):
one_box = boxes[i]
one_conf = confs[i]
one_cls = phrases[i]
x1 = int((one_box[0] - one_box[2] / 2) * img_w)
y1 = int((one_box[1] - one_box[3] / 2) * img_h)
x2 = int((one_box[0] + one_box[2] / 2) * img_w)
y2 = int((one_box[1] + one_box[3] / 2) * img_h)
image = cv2.rectangle(ori_img, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(ori_img, f'{one_cls} {one_conf:.2f}', (x1-15, y1-15), fontFace = cv2.FONT_HERSHEY_SIMPLEX, color = (255, 255, 255), fontScale=1.5, thickness=3)
cv2.imwrite('./images/out/result.jpg', ori_img)
\ No newline at end of file
from typing import Tuple, List, Dict
import cv2
import numpy as np
import torch
import onnxruntime as ort
from transformers import BertTokenizer, AutoTokenizer
import bisect
import time
import os
from groundingdino.util.inference import load_image
from groundingdino.models.GroundingDINO.bertwarper import generate_masks_with_special_tokens_and_transfer_map
# 加入推理延迟等指标
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def get_phrases_from_posmap(
posmap: np.ndarray, tokenized: Dict, tokenizer: AutoTokenizer, left_idx: int = 0, right_idx: int = 255
):
assert isinstance(posmap, np.ndarray), "posmap must be np.ndarray"
if posmap.ndim == 1:
# 将指定范围内的元素设为 False
posmap[:left_idx + 1] = False
posmap[right_idx:] = False
# 获取非零元素的索引
non_zero_idx = np.nonzero(posmap)[0]
token_ids = [tokenized["input_ids"][i] for i in non_zero_idx]
return tokenizer.decode(token_ids)
else:
raise NotImplementedError("posmap must be 1-dim")
def preprocess_caption(caption: str) -> str:
result = caption.lower().strip()
if result.endswith("."):
return result
return result + "."
# 核心优化:增加tokenizer参数,从外部传入
def predict(
ort_session,
tokenizer: AutoTokenizer, # 外部预加载的tokenizer
image: np.array,
caption: str,
box_threshold: float,
text_threshold: float,
device: str = "cpu",
remove_combined: bool = False,
is_benchmark: bool = False, # 新增:标记是否为基准测试(控制日志输出)
save_npy: bool = False
) -> Tuple[torch.Tensor, torch.Tensor, List[str]]:
# 1. 文本预处理
t0 = time.time()
caption = preprocess_caption(caption=caption)
if not is_benchmark:
print(f"Caption processing took {(time.time() - t0):.3f}s")
captions = [caption]
# 3. 编码文本
t0 = time.time()
# 移除重复加载tokenizer的性能黑洞
tokenized = tokenizer(captions, padding="longest", return_tensors="pt").to(device)
specical_tokens = tokenizer.convert_tokens_to_ids (["[CLS]", "[SEP]", ".", "?"])
if not is_benchmark:
print(f"Word embedding took {(time.time() - t0):.3f}s")
# 4. 生成注意力掩码和位置信息
t0 = time.time()
(
text_self_attention_masks,
position_ids,
cate_to_token_mask_list,
) = generate_masks_with_special_tokens_and_transfer_map(
tokenized, specical_tokens, tokenizer)
if not is_benchmark:
print(f"Generate attention masks took {(time.time() - t0):.3f}s")
# 5. 处理超长文本
max_text_len = 256
if text_self_attention_masks.shape[1] > max_text_len:
text_self_attention_masks = text_self_attention_masks[
:, : max_text_len, : max_text_len]
position_ids = position_ids[:, : max_text_len]
tokenized["input_ids"] = tokenized["input_ids"][:, : max_text_len]
tokenized["attention_mask"] = tokenized["attention_mask"][:, : max_text_len]
tokenized["token_type_ids"] = tokenized["token_type_ids"][:, : max_text_len]
# 6. 执行模型推理
attention_mask = np.asarray(tokenized["attention_mask"]).astype(bool)
input_dict = {
"img": np.expand_dims(np.asarray(image), axis=0),
"input_ids": np.asarray(tokenized["input_ids"]),
"attention_mask": attention_mask,
"position_ids": np.asarray(position_ids),
"token_type_ids": np.asarray(tokenized["token_type_ids"]),
"text_token_mask": np.asarray(text_self_attention_masks)
}
# ===================== 【核心:保存模型输入 npy】 =====================
if save_npy and not is_benchmark:
save_dir = "npy_io"
os.makedirs(save_dir, exist_ok=True)
# 保存所有输入
np.save(f"{save_dir}/input_img.npy", input_dict["img"])
np.save(f"{save_dir}/input_input_ids.npy", input_dict["input_ids"])
np.save(f"{save_dir}/input_attention_mask.npy", input_dict["attention_mask"])
np.save(f"{save_dir}/input_position_ids.npy", input_dict["position_ids"])
np.save(f"{save_dir}/input_token_type_ids.npy", input_dict["token_type_ids"])
np.save(f"{save_dir}/input_text_token_mask.npy", input_dict["text_token_mask"])
print(f"\n✅ 模型输入已保存到 {save_dir}/ 文件夹")
# ====================================================================
t0 = time.time()
outputs = ort_session.run(['logits', 'boxes'], input_dict)
infer_time = time.time() - t0
if not is_benchmark:
print(f"Inference time: {infer_time:.3f}s")
# ===================== 【核心:保存模型输出 npy】 =====================
if save_npy and not is_benchmark:
np.save(f"{save_dir}/output_logits.npy", outputs[0])
np.save(f"{save_dir}/output_boxes.npy", outputs[1])
print(f"✅ 模型输出已保存到 {save_dir}/ 文件夹")
# ====================================================================
# 7. 获取预测结果
prediction_logits = np.apply_along_axis(sigmoid, -1, outputs[0][0])
prediction_boxes = outputs[1][0]
if not is_benchmark:
print(f"\n=== Debug Info ===")
print(f"Prediction logits shape: {prediction_logits.shape}")
print(f"Prediction boxes shape: {prediction_boxes.shape}")
print(f"Max logit value: {np.max(prediction_logits):.4f}")
print(f"Mean logit value: {np.mean(prediction_logits):.4f}")
# 8. 应用过滤条件
max_values = np.max(prediction_logits, axis=1)
mask = max_values > box_threshold
logits = prediction_logits[mask]
boxes = prediction_boxes[mask]
# 9. 处理文本匹配
tokenized = tokenizer(caption)
# 10. 处理特殊标记
if remove_combined:
sep_idx = [i for i in range(len(tokenized['input_ids'])) if tokenized['input_ids'][i] in [101, 102, 1012]]
phrases = []
for logit in logits:
max_idx = logit.argmax()
insert_idx = bisect.bisect_left(sep_idx, max_idx)
right_idx = sep_idx[insert_idx]
left_idx = sep_idx[insert_idx - 1]
phrases.append(
get_phrases_from_posmap(logit > text_threshold, tokenized, tokenizer, left_idx, right_idx).replace('.', '')
)
else:
phrases = [
get_phrases_from_posmap(logit > text_threshold, tokenized, tokenizer).replace('.', '')
for logit in logits
]
return boxes, np.max(logits, axis=1), phrases
# 新增:完整的性能测试函数(包含预热+实际推理)
def benchmark_performance(
ort_session, tokenizer, image, caption, box_threshold, text_threshold,
warmup_runs=5, test_runs=10, device="cpu"
):
"""
性能测试函数:包含预热和实际推理
:param warmup_runs: 预热次数
:param test_runs: 实际测试次数
"""
print("="*60)
print("📊 开始性能测试(包含预热+实际推理)")
print("="*60)
# 1. 预热阶段
print(f"\n🔥 预热阶段({warmup_runs} 次)- 不计入性能统计")
warmup_start = time.time()
for i in range(warmup_runs):
t0 = time.time()
predict(ort_session, tokenizer, image, caption, box_threshold, text_threshold, device, is_benchmark=True)
warmup_time = time.time() - t0
print(f"预热 {i+1}/{warmup_runs} - 耗时: {warmup_time*1000:.2f} ms")
total_warmup_time = time.time() - warmup_start
print(f"\n预热完成 - 总耗时: {total_warmup_time:.3f} s, 平均每次: {total_warmup_time/warmup_runs*1000:.2f} ms")
# 2. 实际推理测试阶段
print(f"\n🚀 实际推理测试阶段({test_runs} 次)- 统计性能指标")
test_start = time.time()
infer_times = [] # 记录每次推理耗时
for i in range(test_runs):
t0 = time.time()
predict(ort_session, tokenizer, image, caption, box_threshold, text_threshold, device, is_benchmark=True)
infer_time = time.time() - t0
infer_times.append(infer_time)
print(f"实际推理 {i+1}/{test_runs} - 耗时: {infer_time*1000:.2f} ms")
# 3. 计算性能指标
total_test_time = time.time() - test_start
avg_infer_time = np.mean(infer_times)
std_infer_time = np.std(infer_times)
max_infer_time = np.max(infer_times)
min_infer_time = np.min(infer_times)
fps = test_runs / total_test_time
# 4. 输出性能报告
print("\n" + "="*60)
print("📈 性能测试报告(仅实际推理阶段)")
print("="*60)
print(f"测试次数: {test_runs} 次")
print(f"总推理耗时: {total_test_time:.3f} s")
print(f"平均推理耗时: {avg_infer_time*1000:.2f} ms (±{std_infer_time*1000:.2f} ms)")
print(f"最大推理耗时: {max_infer_time*1000:.2f} ms")
print(f"最小推理耗时: {min_infer_time*1000:.2f} ms")
print(f"平均FPS: {fps:.2f} 帧/秒")
print("="*60)
return {
"warmup_runs": warmup_runs,
"test_runs": test_runs,
"avg_infer_time_ms": avg_infer_time*1000,
"std_infer_time_ms": std_infer_time*1000,
"max_infer_time_ms": max_infer_time*1000,
"min_infer_time_ms": min_infer_time*1000,
"fps": fps
}
if __name__ == '__main__':
# 配置参数
model_path = 'weights/ground.onnx'
img_path = 'images/in/car_1.jpg'
TEXT_PROMPT = "car ."
BOX_TRESHOLD = 0.35
TEXT_TRESHOLD = 0.25
DEVICE = "cpu"
WARMUP_RUNS = 5 # 预热次数
TEST_RUNS = 10 # 实际测试次数
# 加载图片
image_source, image = load_image(img_path)
# 加载ONNX模型(启用优化)
print("🔍 加载ONNX模型")
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL # 启用所有图优化
sess_options.log_severity_level = 3 # 减少日志输出
# sess_options.enable_profiling = True # 启用性能分析
ort_session = ort.InferenceSession(model_path,
sess_options=sess_options,
providers=['ROCMExecutionProvider']
# provider_options=[{
# "device_id": 0,
# "migraphx_fp16_enable": "False",
# "migraphx_int8_enable": "False",
# # 尝试禁用 MIGraphX 内部优化
# "migraphx_save_compiled_model": "False",
# }]
)
# 查看当前执行引擎
current_provider = ort_session.get_providers()
print(f"✅ 模型加载完成 - 当前执行引擎: {current_provider}")
# 预加载tokenizer(只加载一次,核心优化)
print("\n📝 预加载BERT Tokenizer(仅加载一次)")
t0 = time.time()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
print(f"✅ Tokenizer加载完成 - 耗时: {(time.time() - t0):.3f} s")
# 第一步:运行完整的性能测试(预热+实际推理)
performance_result = benchmark_performance(
ort_session, tokenizer, image, TEXT_PROMPT,
BOX_TRESHOLD, TEXT_TRESHOLD,
WARMUP_RUNS, TEST_RUNS, DEVICE
)
# 第二步:执行一次完整推理(带详细日志,保存结果图片)
print("\n" + "="*60)
print("🎯 执行最终推理(带详细日志+保存结果)")
print("="*60)
boxes, confs, phrases = predict(
ort_session, tokenizer, image, TEXT_PROMPT,
BOX_TRESHOLD, TEXT_TRESHOLD, DEVICE
)
# 绘制并保存结果图片
ori_img = cv2.imread(img_path)
img_h = ori_img.shape[0]
img_w = ori_img.shape[1]
for i in range(len(boxes)):
one_box = boxes[i]
one_conf = confs[i]
one_cls = phrases[i]
x1 = int((one_box[0] - one_box[2] / 2) * img_w)
y1 = int((one_box[1] - one_box[3] / 2) * img_h)
x2 = int((one_box[0] + one_box[2] / 2) * img_w)
y2 = int((one_box[1] + one_box[3] / 2) * img_h)
cv2.rectangle(ori_img, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(
ori_img, f'{one_cls} {one_conf:.2f}',
(x1-15, y1-15),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
color=(255, 255, 255),
fontScale=1.5,
thickness=3
)
# 保存结果
cv2.imwrite('./images/out/result.jpg', ori_img)
print(f"\n✅ 结果已保存至: ./images/out/result.jpg")
print(f"✅ 检测到目标: {phrases} (共 {len(boxes)} 个)")
# profile_file = ort_session.end_profiling()
# print(f"\n📊 Profiling 文件已生成: {profile_file}")
\ No newline at end of file
from typing import Tuple, List, Dict
import os
import cv2
import numpy as np
import torch
import onnxruntime as ort
from transformers import BertTokenizer, AutoTokenizer
import bisect
import time
import warnings
warnings.filterwarnings('ignore')
from groundingdino.util.inference import load_image
from groundingdino.models.GroundingDINO.bertwarper import generate_masks_with_special_tokens_and_transfer_map
# 加入推理延迟等指标
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def get_phrases_from_posmap(
posmap: np.ndarray, tokenized: Dict, tokenizer: AutoTokenizer, left_idx: int = 0, right_idx: int = 255
):
assert isinstance(posmap, np.ndarray), "posmap must be np.ndarray"
if posmap.ndim == 1:
# 将指定范围内的元素设为 False
posmap[:left_idx + 1] = False
posmap[right_idx:] = False
# 获取非零元素的索引
non_zero_idx = np.nonzero(posmap)[0]
token_ids = [tokenized["input_ids"][i] for i in non_zero_idx]
return tokenizer.decode(token_ids)
else:
raise NotImplementedError("posmap must be 1-dim")
def preprocess_caption(caption: str) -> str:
result = caption.lower().strip()
if result.endswith("."):
return result
return result + "."
# 核心优化:固定尺寸内存池(800x1200),batch_size=1
class HIPMemoryPool:
def __init__(self, img_shape=(3, 800, 1200), max_text_len=256, device="cpu"):
self.img_shape = img_shape # 固定800x1200
self.max_text_len = max_text_len
self.device = device
self.pool = {}
# 预分配所有内存(固定尺寸,无动态分配)
self._preallocate_all_buffers()
def _preallocate_all_buffers(self):
"""预分配固定尺寸的所有内存(800x1200,batch_size=1)"""
# 图像内存 (1, 3, 800, 1200) - 固定尺寸
self.pool["img"] = np.zeros((1,) + self.img_shape, dtype=np.float32)
# 文本相关内存 (batch_size=1, 256)
self.pool["input_ids"] = np.zeros((1, self.max_text_len), dtype=np.int64)
self.pool["attention_mask"] = np.zeros((1, self.max_text_len), dtype=bool)
self.pool["position_ids"] = np.zeros((1, self.max_text_len), dtype=np.int64)
self.pool["token_type_ids"] = np.zeros((1, self.max_text_len), dtype=np.int64)
self.pool["text_token_mask"] = np.zeros((1, self.max_text_len, self.max_text_len), dtype=bool)
def update_img_buffer(self, image: np.array):
"""更新图像缓冲区(固定800x1200尺寸)"""
# 校验输入尺寸,确保是800x1200
if image.shape != self.img_shape:
raise ValueError(f"图片尺寸必须为{self.img_shape},当前为{image.shape}")
self.pool["img"][0] = image
return self.pool["img"]
def update_text_buffers(self, tokenized, position_ids, text_self_attention_masks):
"""更新文本缓冲区(复用固定内存)"""
# 截断并复制文本数据到预分配缓冲区
text_len = min(tokenized["input_ids"].shape[1], self.max_text_len)
self.pool["input_ids"][0, :text_len] = tokenized["input_ids"][0, :text_len].cpu().numpy()
self.pool["attention_mask"][0, :text_len] = tokenized["attention_mask"][0, :text_len].cpu().numpy().astype(bool)
self.pool["position_ids"][0, :text_len] = position_ids[0, :text_len].cpu().numpy()
self.pool["token_type_ids"][0, :text_len] = tokenized["token_type_ids"][0, :text_len].cpu().numpy()
# 文本注意力掩码
mask_len = min(text_self_attention_masks.shape[1], self.max_text_len)
self.pool["text_token_mask"][0, :mask_len, :mask_len] = text_self_attention_masks[0, :mask_len, :mask_len].cpu().numpy()
return {
"input_ids": self.pool["input_ids"],
"attention_mask": self.pool["attention_mask"],
"position_ids": self.pool["position_ids"],
"token_type_ids": self.pool["token_type_ids"],
"text_token_mask": self.pool["text_token_mask"]
}
# 核心推理函数(适配固定尺寸+batch_size=1)
def predict(
ort_session,
tokenizer: AutoTokenizer,
memory_pool: HIPMemoryPool,
image: np.array,
caption: str,
box_threshold: float,
text_threshold: float,
device: str = "cpu",
remove_combined: bool = False,
is_benchmark: bool = False
) -> Tuple[torch.Tensor, torch.Tensor, List[str]]:
# 1. 文本预处理
caption = preprocess_caption(caption=caption)
# 2. 编码文本(复用tokenizer)
tokenized = tokenizer([caption], padding="longest", return_tensors="pt").to(device)
specical_tokens = tokenizer.convert_tokens_to_ids(["[CLS]", "[SEP]", ".", "?"])
# 3. 生成注意力掩码和位置信息
(
text_self_attention_masks,
position_ids,
cate_to_token_mask_list,
) = generate_masks_with_special_tokens_and_transfer_map(
tokenized, specical_tokens, tokenizer)
# 4. 处理超长文本(截断)
max_text_len = memory_pool.max_text_len
if text_self_attention_masks.shape[1] > max_text_len:
text_self_attention_masks = text_self_attention_masks[:, :max_text_len, :max_text_len]
position_ids = position_ids[:, :max_text_len]
tokenized["input_ids"] = tokenized["input_ids"][:, :max_text_len]
tokenized["attention_mask"] = tokenized["attention_mask"][:, :max_text_len]
tokenized["token_type_ids"] = tokenized["token_type_ids"][:, :max_text_len]
# 5. 复用固定尺寸内存池
img_input = memory_pool.update_img_buffer(image)
text_inputs = memory_pool.update_text_buffers(tokenized, position_ids, text_self_attention_masks)
input_dict = {
"img": img_input,
"input_ids": text_inputs["input_ids"],
"attention_mask": text_inputs["attention_mask"],
"position_ids": text_inputs["position_ids"],
"token_type_ids": text_inputs["token_type_ids"],
"text_token_mask": text_inputs["text_token_mask"]
}
# 6. 执行模型推理(无分步计时,减少同步)
t0 = time.time()
outputs = ort_session.run(['logits', 'boxes'], input_dict)
infer_time = time.time() - t0
if not is_benchmark:
print(f"Inference time: {infer_time:.3f}s")
# 7. 处理预测结果
prediction_logits = np.apply_along_axis(sigmoid, -1, outputs[0][0])
prediction_boxes = outputs[1][0]
if not is_benchmark:
print(f"\n=== Debug Info ===")
print(f"Prediction logits shape: {prediction_logits.shape}")
print(f"Prediction boxes shape: {prediction_boxes.shape}")
print(f"Max logit value: {np.max(prediction_logits):.4f}")
print(f"Mean logit value: {np.mean(prediction_logits):.4f}")
# 8. 过滤结果
max_values = np.max(prediction_logits, axis=1)
mask = max_values > box_threshold
logits = prediction_logits[mask]
boxes = prediction_boxes[mask]
# 9. 生成文本标签
tokenized_caption = tokenizer(caption)
if remove_combined:
sep_idx = [i for i in range(len(tokenized_caption['input_ids']))
if tokenized_caption['input_ids'][i] in [101, 102, 1012]]
phrases = []
for logit in logits:
max_idx = logit.argmax()
insert_idx = bisect.bisect_left(sep_idx, max_idx)
right_idx = sep_idx[insert_idx] if insert_idx < len(sep_idx) else len(logit)
left_idx = sep_idx[insert_idx - 1] if insert_idx > 0 else 0
phrases.append(
get_phrases_from_posmap(logit > text_threshold, tokenized_caption,
tokenizer, left_idx, right_idx).replace('.', '')
)
else:
phrases = [
get_phrases_from_posmap(logit > text_threshold, tokenized_caption, tokenizer).replace('.', '')
for logit in logits
]
return boxes, np.max(logits, axis=1), phrases
# 性能测试函数(适配batch_size=1)
def benchmark_performance(
ort_session, tokenizer, memory_pool, image, caption, box_threshold, text_threshold,
warmup_runs=5, test_runs=10, device="cpu", batch_size=1
):
"""
性能测试函数:batch_size=1,固定800x1200尺寸
"""
print("="*60)
print("📊 开始性能测试(固定800x1200,batch_size=1)")
print("="*60)
# 1. 预热阶段(加载HIP模块)
print(f"\n🔥 预热阶段({warmup_runs} 次)- 加载HIP模块")
warmup_start = time.time()
for i in range(warmup_runs):
t0 = time.time()
predict(ort_session, tokenizer, memory_pool, image, caption,
box_threshold, text_threshold, device, is_benchmark=True)
warmup_time = time.time() - t0
print(f"预热 {i+1}/{warmup_runs} - 耗时: {warmup_time*1000:.2f} ms")
total_warmup_time = time.time() - warmup_start
print(f"\n预热完成 - 总耗时: {total_warmup_time:.3f} s (HIP模块已加载完成)")
# 2. 实际推理测试(batch_size=1)
print(f"\n🚀 实际推理测试({test_runs} 次,batch_size=1)")
test_start = time.time()
infer_times = []
# 单张推理(batch_size=1)
for i in range(test_runs):
t0 = time.time()
predict(ort_session, tokenizer, memory_pool, image, caption,
box_threshold, text_threshold, device, is_benchmark=True)
infer_time = time.time() - t0
infer_times.append(infer_time)
print(f"实际推理 {i+1}/{test_runs} - 耗时: {infer_time*1000:.2f} ms")
# 计算性能指标
total_test_time = time.time() - test_start
avg_infer_time = np.mean(infer_times)
std_infer_time = np.std(infer_times)
max_infer_time = np.max(infer_times)
min_infer_time = np.min(infer_times)
fps = test_runs / total_test_time
# 输出性能报告
print("\n" + "="*60)
print("📈 优化后性能测试报告(固定800x1200)")
print("="*60)
print(f"测试次数: {test_runs} 次 (batch_size=1)")
print(f"总推理耗时: {total_test_time:.3f} s")
print(f"平均推理耗时: {avg_infer_time*1000:.2f} ms (±{std_infer_time*1000:.2f} ms)")
print(f"最大推理耗时: {max_infer_time*1000:.2f} ms")
print(f"最小推理耗时: {min_infer_time*1000:.2f} ms")
print(f"平均FPS: {fps:.2f} 帧/秒")
print("="*60)
return {
"warmup_runs": warmup_runs,
"test_runs": test_runs,
"batch_size": batch_size,
"avg_infer_time_ms": avg_infer_time*1000,
"std_infer_time_ms": std_infer_time*1000,
"max_infer_time_ms": max_infer_time*1000,
"min_infer_time_ms": min_infer_time*1000,
"fps": fps
}
if __name__ == '__main__':
# ========== 固定配置参数(800x1200,batch_size=1) ==========
model_path = 'weights/ground.onnx'
img_path = 'images/in/car_1.jpg'
TEXT_PROMPT = "car ."
BOX_TRESHOLD = 0.35
TEXT_TRESHOLD = 0.25
DEVICE = "cpu" # 实际使用时改为"rocm"
WARMUP_RUNS = 5 # 预热次数
TEST_RUNS = 10 # 实际测试次数
BATCH_SIZE = 1 # 固定为1
IMG_SHAPE = (3, 800, 1200) # 固定导出尺寸
MAX_TEXT_LEN = 256
# ========== ONNX Runtime优化配置(针对ROCM/HIP) ==========
print("🔍 加载ONNX模型(固定800x1200,batch_size=1)")
sess_options = ort.SessionOptions()
# 启用所有图优化
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
# 禁用按需加载内核(预加载所有HIP内核)
sess_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
sess_options.enable_cpu_mem_arena = False
sess_options.enable_mem_pattern = True
sess_options.log_severity_level = 3
# ROCM/HIP优化配置
providers = [
('ROCMExecutionProvider', {
'device_id': 0,
'arena_extend_strategy': 'kNextPowerOfTwo',
'gpu_mem_limit': 8 * 1024 * 1024 * 1024, # 8GB GPU内存
'cudnn_conv_algo_search': 'EXHAUSTIVE',
'do_copy_in_default_stream': True # 减少流同步
}),
'CPUExecutionProvider'
]
# ========== 加载模型(仅一次,解决hipModuleLoadData瓶颈) ==========
ort_session = ort.InferenceSession(
model_path,
sess_options=sess_options,
providers=providers
)
current_provider = ort_session.get_providers()
print(f"✅ 模型加载完成 - 当前执行引擎: {current_provider}")
# ========== 预加载tokenizer(仅一次) ==========
print("\n📝 预加载BERT Tokenizer")
t0 = time.time()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
print(f"✅ Tokenizer加载完成 - 耗时: {(time.time() - t0):.3f} s")
# ========== 初始化固定尺寸内存池(800x1200) ==========
print("\n🗃️ 初始化固定尺寸内存池(800x1200)")
memory_pool = HIPMemoryPool(img_shape=IMG_SHAPE, max_text_len=MAX_TEXT_LEN, device=DEVICE)
print(f"✅ 内存池初始化完成 - 固定尺寸: {IMG_SHAPE}")
# ========== 加载并校验图片尺寸 ==========
print("\n🖼️ 加载并预处理测试图片(强制800x1200)")
image_source, image = load_image(img_path)
# 强制调整为800x1200(确保和导出尺寸一致)
if image.shape != IMG_SHAPE:
print(f"⚠️ 图片尺寸{image.shape}不符,强制调整为{IMG_SHAPE}")
image = cv2.resize(image.transpose(1,2,0), (IMG_SHAPE[2], IMG_SHAPE[1])).transpose(2,0,1)
print(f"✅ 图片加载完成 - 最终尺寸: {image.shape}")
# ========== 性能测试 ==========
performance_result = benchmark_performance(
ort_session, tokenizer, memory_pool, image, TEXT_PROMPT,
BOX_TRESHOLD, TEXT_TRESHOLD,
WARMUP_RUNS, TEST_RUNS, DEVICE, BATCH_SIZE
)
# ========== 最终推理 ==========
print("\n" + "="*60)
print("🎯 执行最终推理(固定800x1200)")
print("="*60)
boxes, confs, phrases = predict(
ort_session, tokenizer, memory_pool, image, TEXT_PROMPT,
BOX_TRESHOLD, TEXT_TRESHOLD, DEVICE
)
# 绘制并保存结果
os.makedirs('./images/out', exist_ok=True)
ori_img = cv2.imread(img_path)
# 强制调整原始图片尺寸以匹配推理尺寸
ori_img = cv2.resize(ori_img, (IMG_SHAPE[2], IMG_SHAPE[1]))
img_h, img_w = ori_img.shape[:2]
for i in range(len(boxes)):
one_box = boxes[i]
one_conf = confs[i]
one_cls = phrases[i]
# 转换box坐标 (cx, cy, w, h) -> (x1, y1, x2, y2)
x1 = int((one_box[0] - one_box[2] / 2) * img_w)
y1 = int((one_box[1] - one_box[3] / 2) * img_h)
x2 = int((one_box[0] + one_box[2] / 2) * img_w)
y2 = int((one_box[1] + one_box[3] / 2) * img_h)
# 绘制框和标签
cv2.rectangle(ori_img, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(
ori_img, f'{one_cls} {one_conf:.2f}',
(x1-15, y1-15),
cv2.FONT_HERSHEY_SIMPLEX,
1.5, (255, 255, 255), 3
)
# 保存结果
cv2.imwrite('./images/out/result_800x1200.jpg', ori_img)
print(f"\n✅ 结果已保存至: ./images/out/result_800x1200.jpg")
print(f"✅ 检测到目标: {phrases} (共 {len(boxes)} 个)")
print(f"✅ 性能指标: FPS={performance_result['fps']:.2f}, 平均耗时={performance_result['avg_infer_time_ms']:.2f}ms")
\ No newline at end of file
from typing import Tuple, List, Dict
import cv2
import numpy as np
import torch
import onnxruntime as ort
from transformers import BertTokenizer, AutoTokenizer
import bisect
import time
from groundingdino.util.inference import load_image
from groundingdino.models.GroundingDINO.bertwarper import generate_masks_with_special_tokens_and_transfer_map
# 加入推理延迟等指标
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def get_phrases_from_posmap(
posmap: np.ndarray, tokenized: Dict, tokenizer: AutoTokenizer, left_idx: int = 0, right_idx: int = 255
):
assert isinstance(posmap, np.ndarray), "posmap must be np.ndarray"
if posmap.ndim == 1:
# 将指定范围内的元素设为 False
posmap[:left_idx + 1] = False
posmap[right_idx:] = False
# 获取非零元素的索引
non_zero_idx = np.nonzero(posmap)[0]
token_ids = [tokenized["input_ids"][i] for i in non_zero_idx]
return tokenizer.decode(token_ids)
else:
raise NotImplementedError("posmap must be 1-dim")
def preprocess_caption(caption: str) -> str:
result = caption.lower().strip()
if result.endswith("."):
return result
return result + "."
# 核心优化:增加tokenizer参数,从外部传入 + 适配batch_size=4
def predict_batch(
ort_session,
tokenizer: AutoTokenizer, # 外部预加载的tokenizer
images: np.array, # 修改:接收批量图像 (batch_size, 3, H, W)
captions: List[str], # 修改:接收批量文本
box_threshold: float,
text_threshold: float,
device: str = "cpu",
remove_combined: bool = False,
is_benchmark: bool = False # 新增:标记是否为基准测试(控制日志输出)
) -> Tuple[List[torch.Tensor], List[torch.Tensor], List[List[str]]]:
"""
批量推理函数(batch_size=8)
返回:每个样本的boxes、confs、phrases列表
"""
BATCH_SIZE = images.shape[0]
if not is_benchmark:
print(f"\n开始批量推理 - batch_size: {BATCH_SIZE}")
# 1. 文本预处理
t0 = time.time()
captions = [preprocess_caption(caption=c) for c in captions]
if not is_benchmark:
print(f"Caption processing took {(time.time() - t0):.3f}s")
# 3. 编码文本(批量)
t0 = time.time()
# 移除重复加载tokenizer的性能黑洞
tokenized = tokenizer(captions, padding="longest", return_tensors="pt").to(device)
specical_tokens = tokenizer.convert_tokens_to_ids (["[CLS]", "[SEP]", ".", "?"])
if not is_benchmark:
print(f"Word embedding took {(time.time() - t0):.3f}s")
# 4. 生成注意力掩码和位置信息
t0 = time.time()
(
text_self_attention_masks,
position_ids,
cate_to_token_mask_list,
) = generate_masks_with_special_tokens_and_transfer_map(
tokenized, specical_tokens, tokenizer)
if not is_benchmark:
print(f"Generate attention masks took {(time.time() - t0):.3f}s")
# 5. 处理超长文本
max_text_len = 256
if text_self_attention_masks.shape[1] > max_text_len:
text_self_attention_masks = text_self_attention_masks[
:, : max_text_len, : max_text_len]
position_ids = position_ids[:, : max_text_len]
tokenized["input_ids"] = tokenized["input_ids"][:, : max_text_len]
tokenized["attention_mask"] = tokenized["attention_mask"][:, : max_text_len]
tokenized["token_type_ids"] = tokenized["token_type_ids"][:, : max_text_len]
# 6. 执行模型推理
attention_mask = np.asarray(tokenized["attention_mask"]).astype(bool)
input_dict = {
"img": images, # 批量图像 (8, 3, H, W)
"input_ids": np.asarray(tokenized["input_ids"]),
"attention_mask": attention_mask,
"position_ids": np.asarray(position_ids),
"token_type_ids": np.asarray(tokenized["token_type_ids"]),
"text_token_mask": np.asarray(text_self_attention_masks)
}
t0 = time.time()
outputs = ort_session.run(['logits', 'boxes'], input_dict)
infer_time = time.time() - t0
if not is_benchmark:
print(f"Inference time (batch): {infer_time:.3f}s")
print(f"Single sample avg infer time: {infer_time/BATCH_SIZE:.3f}s")
# 7. 获取预测结果(批量)
prediction_logits = np.apply_along_axis(sigmoid, -1, outputs[0]) # (4, N, L)
prediction_boxes = outputs[1] # (4, N, 4)
if not is_benchmark:
print(f"\n=== Debug Info (Batch) ===")
print(f"Prediction logits shape: {prediction_logits.shape}")
print(f"Prediction boxes shape: {prediction_boxes.shape}")
# 存储每个样本的结果
all_boxes = []
all_confs = []
all_phrases = []
# 逐个样本处理
for idx in range(BATCH_SIZE):
logits = prediction_logits[idx]
boxes = prediction_boxes[idx]
# 8. 应用过滤条件
max_values = np.max(logits, axis=1)
mask = max_values > box_threshold
filtered_logits = logits[mask]
filtered_boxes = boxes[mask]
# 9. 处理文本匹配
single_tokenized = tokenizer(captions[idx])
# 10. 处理特殊标记
if remove_combined:
sep_idx = [i for i in range(len(single_tokenized['input_ids'])) if single_tokenized['input_ids'][i] in [101, 102, 1012]]
phrases = []
for logit in filtered_logits:
max_idx = logit.argmax()
insert_idx = bisect.bisect_left(sep_idx, max_idx)
right_idx = sep_idx[insert_idx]
left_idx = sep_idx[insert_idx - 1]
phrases.append(
get_phrases_from_posmap(logit > text_threshold, single_tokenized, tokenizer, left_idx, right_idx).replace('.', '')
)
else:
phrases = [
get_phrases_from_posmap(logit > text_threshold, single_tokenized, tokenizer).replace('.', '')
for logit in filtered_logits
]
all_boxes.append(filtered_boxes)
all_confs.append(np.max(filtered_logits, axis=1))
all_phrases.append(phrases)
return all_boxes, all_confs, all_phrases
# 新增:完整的批量性能测试函数(包含预热+实际推理)
def benchmark_performance_batch(
ort_session, tokenizer, batch_images, batch_captions, box_threshold, text_threshold,
warmup_runs=5, test_runs=10, device="cpu"
):
"""
批量性能测试函数(batch_size=8)
"""
BATCH_SIZE = batch_images.shape[0]
print("="*60)
print(f"📊 开始批量性能测试(batch_size={BATCH_SIZE})")
print("="*60)
# 1. 预热阶段
print(f"\n🔥 预热阶段({warmup_runs} 次)- 不计入性能统计")
warmup_start = time.time()
for i in range(warmup_runs):
t0 = time.time()
predict_batch(ort_session, tokenizer, batch_images, batch_captions,
box_threshold, text_threshold, device, is_benchmark=True)
warmup_time = time.time() - t0
print(f"预热 {i+1}/{warmup_runs} - 批次耗时: {warmup_time*1000:.2f} ms, 单样本平均: {warmup_time/BATCH_SIZE*1000:.2f} ms")
total_warmup_time = time.time() - warmup_start
print(f"\n预热完成 - 总耗时: {total_warmup_time:.3f} s, 批次平均: {total_warmup_time/warmup_runs*1000:.2f} ms")
# 2. 实际推理测试阶段
print(f"\n🚀 实际推理测试阶段({test_runs} 次)- 统计性能指标")
test_start = time.time()
batch_infer_times = [] # 记录每次批次推理耗时
for i in range(test_runs):
t0 = time.time()
predict_batch(ort_session, tokenizer, batch_images, batch_captions,
box_threshold, text_threshold, device, is_benchmark=True)
infer_time = time.time() - t0
batch_infer_times.append(infer_time)
print(f"实际推理 {i+1}/{test_runs} - 批次耗时: {infer_time*1000:.2f} ms, 单样本平均: {infer_time/BATCH_SIZE*1000:.2f} ms")
# 3. 计算性能指标
total_test_time = time.time() - test_start
total_samples = test_runs * BATCH_SIZE
avg_batch_time = np.mean(batch_infer_times)
std_batch_time = np.std(batch_infer_times)
avg_sample_time = avg_batch_time / BATCH_SIZE
# 关键:计算FPS(单样本)
fps = total_samples / total_test_time # 总样本数 / 总耗时
batch_fps = test_runs / total_test_time # 批次FPS(参考)
# 4. 输出性能报告
print("\n" + "="*60)
print(f"📈 批量性能测试报告(batch_size={BATCH_SIZE})")
print("="*60)
print(f"测试批次: {test_runs} 次, 总样本数: {total_samples}")
print(f"总推理耗时: {total_test_time:.3f} s")
print(f"平均批次耗时: {avg_batch_time*1000:.2f} ms (±{std_batch_time*1000:.2f} ms)")
print(f"平均单样本耗时: {avg_sample_time*1000:.2f} ms")
print(f"批次FPS: {batch_fps:.2f} 批次/秒")
print(f"单样本FPS: {fps:.2f} 帧/秒 (核心指标)")
print("="*60)
return {
"batch_size": BATCH_SIZE,
"warmup_runs": warmup_runs,
"test_runs": test_runs,
"total_samples": total_samples,
"avg_batch_time_ms": avg_batch_time*1000,
"avg_sample_time_ms": avg_sample_time*1000,
"batch_fps": batch_fps,
"sample_fps": fps
}
if __name__ == '__main__':
# 配置参数
model_path = 'weights/ground_bs8.onnx' # 修改:使用batch_size=8的模型
img_paths = [
'images/in/car_1.jpg',
'images/in/car_1.jpg',
'images/in/car_1.jpg',
'images/in/car_1.jpg', # 8张图片对应batch_size=8
'images/in/car_1.jpg',
'images/in/car_1.jpg',
'images/in/car_1.jpg',
'images/in/car_1.jpg'
]
TEXT_PROMPTS = ["car .", "car .", "car .", "car .","car .", "car .", "car .", "car ."] # 批量文本(8个)
BOX_TRESHOLD = 0.35
TEXT_TRESHOLD = 0.25
DEVICE = "cpu"
WARMUP_RUNS = 5 # 预热次数
TEST_RUNS = 10 # 实际测试次数
BATCH_SIZE = 8
# ===================== 加载批量图像 =====================
print("🔍 加载批量图像(batch_size=8)")
batch_images = []
batch_image_sources = []
for img_path in img_paths:
image_source, image = load_image(img_path)
batch_image_sources.append(image_source)
batch_images.append(image)
# 转换为numpy数组 (8, 3, H, W)
batch_images_np = np.stack(batch_images, axis=0)
print(f"✅ 批量图像加载完成 - 形状: {batch_images_np.shape}")
# ===================== 加载ONNX模型 =====================
print("\n🔍 加载ONNX模型(batch_size=8)")
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL # 启用所有图优化
sess_options.log_severity_level = 3 # 减少日志输出
sess_options.enable_profiling = True # 启用性能分析
ort_session = ort.InferenceSession(model_path,
sess_options=sess_options,
providers=['ROCMExecutionProvider'])
# 查看当前执行引擎
current_provider = ort_session.get_providers()
print(f"✅ 模型加载完成 - 当前执行引擎: {current_provider}")
# ===================== 预加载tokenizer =====================
print("\n📝 预加载BERT Tokenizer(仅加载一次)")
t0 = time.time()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
print(f"✅ Tokenizer加载完成 - 耗时: {(time.time() - t0):.3f} s")
# ===================== 第一步:批量性能测试 =====================
performance_result = benchmark_performance_batch(
ort_session, tokenizer, batch_images_np, TEXT_PROMPTS,
BOX_TRESHOLD, TEXT_TRESHOLD,
WARMUP_RUNS, TEST_RUNS, DEVICE
)
# ===================== 第二步:执行一次完整批量推理 =====================
print("\n" + "="*60)
print("🎯 执行最终批量推理(带详细日志+保存结果)")
print("="*60)
all_boxes, all_confs, all_phrases = predict_batch(
ort_session, tokenizer, batch_images_np, TEXT_PROMPTS,
BOX_TRESHOLD, TEXT_TRESHOLD, DEVICE
)
# ===================== 保存批量推理结果 =====================
for idx in range(BATCH_SIZE):
# 读取原始图像
ori_img = cv2.imread(img_paths[idx])
img_h = ori_img.shape[0]
img_w = ori_img.shape[1]
# 绘制检测框
boxes = all_boxes[idx]
confs = all_confs[idx]
phrases = all_phrases[idx]
for i in range(len(boxes)):
one_box = boxes[i]
one_conf = confs[i]
one_cls = phrases[i]
x1 = int((one_box[0] - one_box[2] / 2) * img_w)
y1 = int((one_box[1] - one_box[3] / 2) * img_h)
x2 = int((one_box[0] + one_box[2] / 2) * img_w)
y2 = int((one_box[1] + one_box[3] / 2) * img_h)
cv2.rectangle(ori_img, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(
ori_img, f'{one_cls} {one_conf:.2f}',
(x1-15, y1-15),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
color=(255, 255, 255),
fontScale=1.5,
thickness=3
)
# 保存结果
output_path = f'./images/out/result_{idx+1}.jpg'
cv2.imwrite(output_path, ori_img)
print(f"✅ 样本 {idx+1} 结果已保存至: {output_path}")
print(f" 检测到目标: {phrases} (共 {len(boxes)} 个)")
profile_file = ort_session.end_profiling()
print(f"\n📊 Profiling 文件已生成: {profile_file}")
\ No newline at end of file
MIGRAPHX_LOG=debug migraphx-driver compile \
--onnx weights/ground_external.onnx \
--gpu \
-p dead_code_elimination \
--output weights/ground.mgx
# -p eliminate_contiguous \
# -p simplify_reshapes \
# -p simplify_algebra \
# -p eliminate_identity \
# -p common_subexpression_elimination \
\ No newline at end of file
import cv2
import numpy as np
import torch
import time
import os
import migraphx
from transformers import BertTokenizer
from groundingdino.util.inference import load_image
from groundingdino.models.GroundingDINO.bertwarper import generate_masks_with_special_tokens_and_transfer_map
# =========================
# 工具函数
# =========================
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def preprocess_caption(caption: str) -> str:
result = caption.lower().strip()
if result.endswith("."):
return result
return result + "."
def to_mgx(x):
if x.dtype == np.int64:
return migraphx.argument(x.astype(np.int64))
elif x.dtype == np.bool_:
return migraphx.argument(x.astype(np.bool_))
else:
return migraphx.argument(x.astype(np.float32))
def _mgx_shape_to_numpy(shape):
# 将 migraphx input shape 映射到 numpy dtype + lens 以生成零填充张量
shape_str = str(shape)
if "int64_type" in shape_str:
dtype = np.int64
elif "bool_type" in shape_str:
dtype = np.bool_
elif "half_type" in shape_str:
dtype = np.float16
else:
dtype = np.float32
try:
dims = list(shape.dims())
except Exception:
dims = []
try:
lens = list(shape.lens())
except Exception:
lens = []
# 优先用 dims,dims 为空时才退化到 lens
return dtype, (dims if len(dims) > 0 else lens)
# =========================
# 🚀 MIGraphX 推理类(带缓存)
# =========================
class MIGraphXModel:
def __init__(self, onnx_path, cache_path="weights/ground.mxr", force_recompile=False):
self.cache_path = cache_path
# ====== 优先加载缓存 ======
if os.path.exists(cache_path) and not force_recompile:
print(f"⚡ 直接加载已编译模型: {cache_path}")
self.model = migraphx.load(cache_path)
else:
print("🔍 从 ONNX 构建 MIGraphX")
self.model = migraphx.parse_onnx(onnx_path)
print(self.model)
# ====================== 2. 打印模型输入输出信息 ======================
print("=== 模型输入信息 ===")
inputs = self.model.get_inputs()
for key, value in inputs.items():
print(f"{key}: {value}")
print("\n=== 模型输出信息 ===")
outputs = self.model.get_outputs()
for key, value in outputs.items():
print(f"{key}: {value}")
"""
=== 模型输入信息 ===
text_token_mask: bool_type, {1, 4, 4}, {16, 4, 1}
token_type_ids: int64_type, {1, 4}, {4, 1}
position_ids: int64_type, {1, 4}, {4, 1}
attention_mask: bool_type, {1, 4}, {4, 1}
input_ids: int64_type, {1, 4}, {4, 1}
img: float_type, {1, 3, 800, 1200}, {2880000, 960000, 1200, 1}
=== 模型输出信息 ===
boxes: float_type, {1, 900, 4}, {3600, 4, 1}
logits: float_type, {1, 900, 256}, {230400, 256, 1}
输入节点名称: text_token_mask
输入形状 (N, C, H, W): [1, 4, 4]
"""
# print("\n⚡ 量化模型(FP16)")
# migraphx.quantize_fp16(self.model)
print("⚙️ 编译 MIGraphX(GPU)")
self.model.compile(
t=migraphx.get_target("gpu"),device_id=5
)
# offload_copy=False, fast_math=False, exhaustive_tune=False
# ====== 保存缓存 ======
print(f"💾 保存编译模型到: {cache_path}")
migraphx.save(self.model, cache_path)
self.param_names = self.model.get_parameter_names()
self.input_shapes = self.model.get_inputs()
print("✅ param_names:", self.param_names)
print("✅ input_shape:", self.input_shapes)
try:
self.output_shapes = self.model.get_outputs()
print("✅ output_shapes keys:", list(self.output_shapes.keys()))
except Exception:
self.output_shapes = None
def infer(self, input_dict):
# 只按模型 get_inputs() 定义的输入签名来组装
mgx_inputs = {}
provided_names = set(input_dict.keys())
# 某些 mxr 会把内部输出别名也暴露到 get_parameter_names/get_inputs 里,
# 这里显式排除 main:#output_*,避免把内部输出当成输入填充。
required_names = {
k for k in self.input_shapes.keys()
if not str(k).startswith("main:#output")
}
missing = required_names - provided_names
if missing:
print("⚠️ 缺失模型输入,准备按 shape 自动补齐:")
for name in sorted(missing):
shape = self.input_shapes[name]
dtype, lens = _mgx_shape_to_numpy(shape)
mgx_inputs[name] = to_mgx(np.zeros(lens, dtype=dtype))
print(f" - {name}: shape={lens}, dtype={dtype.__name__}")
for name in (required_names & provided_names):
mgx_inputs[name] = to_mgx(input_dict[name])
# 额外的 key 不喂给模型,避免和内部签名冲突
extra = provided_names - required_names
if extra:
print("ℹ️ 有多余输入参数将被忽略:")
for name in sorted(extra):
print(f" - {name}")
start = time.time()
result = self.model.run(mgx_inputs)
infer_time = time.time() - start
outputs = [np.array(r) for r in result]
return outputs, infer_time
# =========================
# 推理函数
# =========================
def predict(
model,
tokenizer,
image,
caption,
box_threshold,
text_threshold,
is_benchmark=False
):
# 提前针对car .生成对应输入
input_dict = {
"img": np.expand_dims(np.asarray(image), axis=0).astype(np.float32),
"position_ids": np.array([[0, 0, 1, 0]], dtype=np.int64),
"input_ids": np.array([[101, 2482, 1012, 102]], dtype=np.int64),
"token_type_ids": np.array([[0, 0, 0, 0]], dtype=np.int64),
"text_token_mask": np.array([[
[True, False, False, False],
[False, True, True, False],
[False, True, True, False],
[False, False, False, True]
]], dtype=np.bool_),
"attention_mask": np.array([[True, True, True, True]], dtype=np.bool_)
}
outputs, infer_time = model.infer(input_dict)
if not is_benchmark:
print(f"Inference time: {infer_time*1000:.2f} ms")
logits = sigmoid(outputs[0][0])
boxes = outputs[1][0]
max_values = np.max(logits, axis=1)
mask = max_values > box_threshold
logits = logits[mask]
boxes = boxes[mask]
phrases = ["object"] * len(boxes)
return boxes, np.max(logits, axis=1), phrases
# =========================
# Benchmark
# =========================
def benchmark(model, tokenizer, image, caption, box_th, text_th, warmup=5, runs=10):
print("\n🔥 预热")
for _ in range(warmup):
predict(model, tokenizer, image, caption, box_th, text_th, True)
print("\n🚀 测试")
times = []
for i in range(runs):
start = time.time()
predict(model, tokenizer, image, caption, box_th, text_th, True)
times.append(time.time() - start)
print(f"\n平均耗时: {np.mean(times)*1000:.2f} ms")
print(f"FPS: {1/np.mean(times):.2f}")
# =========================
# 主函数
# =========================
if __name__ == "__main__":
model_path = "weights/ground_simplified.onnx"
cache_path = "weights/ground_simplified.mxr" # ⭐ 缓存文件
img_path = "images/in/car_1.jpg"
TEXT_PROMPT = "car ."
BOX_TRESHOLD = 0.35
TEXT_TRESHOLD = 0.25
# 🚀 加载模型(自动缓存)
model = MIGraphXModel(
model_path,
cache_path=cache_path,
force_recompile=False # 改成 True 可强制重编译
)
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
image_source, image = load_image(img_path)
benchmark(model, tokenizer, image, TEXT_PROMPT, BOX_TRESHOLD, TEXT_TRESHOLD)
boxes, confs, phrases = predict(
model, tokenizer, image,
TEXT_PROMPT, BOX_TRESHOLD, TEXT_TRESHOLD
)
print("检测结果:", phrases)
\ No newline at end of file
import migraphx as mgx
p = mgx.parse_onnx("weights/ground_external.onnx") # 只读取,不优化
passes = [
mgx.pass_dead_code_elimination(), # 删除未使用的节点/常量
mgx.pass_eliminate_contiguous(), # 合并相邻的 contiguous 操作
mgx.pass_simplify_reshapes(), # 合并/简化 reshape
mgx.pass_simplify_algebra(), # 简化代数表达式 (add/mul/..)
mgx.pass_eliminate_identity(), # 删除 Identity ops
mgx.pass_common_subexpression_elimination(), # CSE
]
p.apply_passes(passes) # 手动执行
p.compile(mgx.target("gpu"))
p.save("weights/ground.mgx")
\ No newline at end of file
import cv2
import numpy as np
import torch
import time
import os
os.environ["MIGRAPHX_SAVE_TEMPS"] = "1"
os.environ["MIGRAPHX_TRACE"] = "1"
os.environ["MIGRAPHX_LOG_LEVEL"] = "DEBUG"
import migraphx
from transformers import BertTokenizer
from groundingdino.util.inference import load_image
from groundingdino.models.GroundingDINO.bertwarper import generate_masks_with_special_tokens_and_transfer_map
# =========================
# 工具函数
# =========================
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def preprocess_caption(caption: str) -> str:
result = caption.lower().strip()
if result.endswith("."):
return result
return result + "."
def to_mgx(x):
if x.dtype == np.int64:
return migraphx.argument(x.astype(np.int64))
elif x.dtype == np.bool_:
return migraphx.argument(x.astype(np.bool_))
else:
return migraphx.argument(x.astype(np.float32))
def _mgx_shape_to_numpy(shape):
"""将 migraphx shape 转为 numpy dtype 和 lens。"""
shape_str = str(shape)
if "int64_type" in shape_str:
dtype = np.int64
elif "bool_type" in shape_str:
dtype = np.bool_
elif "half_type" in shape_str:
dtype = np.float16
else:
dtype = np.float32
return dtype, list(shape.lens())
# =========================
# 🚀 MIGraphX 推理类(带缓存)
# =========================
class MIGraphXModel:
def __init__(self, onnx_path, cache_path="weights/ground_xiongke.mxr", force_recompile=False):
self.cache_path = cache_path
# ====== 优先加载缓存 ======
if os.path.exists(cache_path) and not force_recompile:
print(f"⚡ 直接加载已编译模型: {cache_path}")
self.model = migraphx.load(cache_path)
else:
print("🔍 从 ONNX 构建 MIGraphX")
self.model = migraphx.parse_onnx(onnx_path)
print(self.model)
# ====================== 2. 打印模型输入输出信息 ======================
print("=== 模型输入信息 ===")
inputs = self.model.get_inputs()
for key, value in inputs.items():
print(f"{key}: {value}")
print("\n=== 模型输出信息 ===")
outputs = self.model.get_outputs()
for key, value in outputs.items():
print(f"{key}: {value}")
# 获取输入节点名称和输入形状
inputName = list(self.model.get_inputs().keys())[0]
inputShape = inputs[inputName].lens()
print(f"\n输入节点名称: {inputName}")
print(f"输入形状 (N, C, H, W): {inputShape}")
inputName1 = list(self.model.get_inputs().keys())[1]
inputShape1 = inputs[inputName].lens()
print(f"\n输入节点名称: {inputName1}")
print(f"输入形状 (N, C, H, W): {inputShape1}")
"""
=== 模型输入信息 ===
text_token_mask: bool_type, {1, 4, 4}, {16, 4, 1}
token_type_ids: int64_type, {1, 4}, {4, 1}
position_ids: int64_type, {1, 4}, {4, 1}
attention_mask: bool_type, {1, 4}, {4, 1}
input_ids: int64_type, {1, 4}, {4, 1}
img: float_type, {1, 3, 800, 1200}, {2880000, 960000, 1200, 1}
=== 模型输出信息 ===
boxes: float_type, {1, 900, 4}, {3600, 4, 1}
logits: float_type, {1, 900, 256}, {230400, 256, 1}
输入节点名称: text_token_mask
输入形状 (N, C, H, W): [1, 4, 4]
"""
# print("\n⚡ 量化模型(FP16)")
# migraphx.quantize_fp16(self.model)
# passes = [
# migraphx.pass_dead_code_elimination(), # 删除未使用的节点/常量
# migraphx.pass_eliminate_contiguous(), # 合并相邻的 contiguous 操作
# migraphx.pass_simplify_reshapes(), # 合并/简化 reshape
# migraphx.pass_simplify_algebra(), # 简化代数表达式 (add/mul/..)
# migraphx.pass_eliminate_identity(), # 删除 Identity ops
# migraphx.pass_common_subexpression_elimination(), # CSE
# ]
# self.model.apply_passes(passes)
print("⚙️ 编译 MIGraphX(GPU)")
self.model.compile(
t=migraphx.get_target("gpu"),device_id=5
)
# offload_copy=False, fast_math=False, exhaustive_tune=False
# ====== 保存缓存 ======
print(f"💾 保存编译模型到: {cache_path}")
migraphx.save(self.model, cache_path)
self.param_names = self.model.get_parameter_names()
self.input_shapes = self.model.get_inputs()
print("✅ 输入节点:", self.param_names)
def infer(self, input_dict):
mgx_inputs = {k: to_mgx(v) for k, v in input_dict.items()}
# 某些通过 disable passes 生成的 mxr 会多出内部别名参数(如 main:#output_*)。
# 若缺失,运行期可能触发 VMFault,这里按 shape 自动补零缓冲区。
auto_filled = []
for name in self.param_names:
if name in mgx_inputs:
continue
if name not in self.input_shapes:
continue
dtype, lens = _mgx_shape_to_numpy(self.input_shapes[name])
mgx_inputs[name] = to_mgx(np.zeros(lens, dtype=dtype))
auto_filled.append((name, lens, dtype.__name__))
if auto_filled:
print("⚠️ 自动补齐内部输入参数:")
for item in auto_filled:
print(f" - {item[0]} shape={item[1]} dtype={item[2]}")
start = time.time()
result = self.model.run(mgx_inputs)
infer_time = time.time() - start
outputs = [np.array(r) for r in result]
return outputs, infer_time
# =========================
# 推理函数
# =========================
def predict(
model,
tokenizer,
image,
caption,
box_threshold,
text_threshold,
is_benchmark=False
):
caption = preprocess_caption(caption)
captions = [caption]
tokenized = tokenizer(captions, padding="longest", return_tensors="pt")
specical_tokens = tokenizer.convert_tokens_to_ids(["[CLS]", "[SEP]", ".", "?"])
(
text_self_attention_masks,
position_ids,
_
) = generate_masks_with_special_tokens_and_transfer_map(
tokenized, specical_tokens, tokenizer
)
max_text_len = 256
if text_self_attention_masks.shape[1] > max_text_len:
text_self_attention_masks = text_self_attention_masks[:, :max_text_len, :max_text_len]
position_ids = position_ids[:, :max_text_len]
tokenized["input_ids"] = tokenized["input_ids"][:, :max_text_len]
tokenized["attention_mask"] = tokenized["attention_mask"][:, :max_text_len]
tokenized["token_type_ids"] = tokenized["token_type_ids"][:, :max_text_len]
input_dict = {
"img": np.expand_dims(np.asarray(image), axis=0).astype(np.float32),
"input_ids": np.asarray(tokenized["input_ids"]).astype(np.int64),
"attention_mask": np.asarray(tokenized["attention_mask"]).astype(np.bool_),
"position_ids": np.asarray(position_ids).astype(np.int64),
"token_type_ids": np.asarray(tokenized["token_type_ids"]).astype(np.int64),
"text_token_mask": np.asarray(text_self_attention_masks).astype(np.bool_)
}
outputs, infer_time = model.infer(input_dict)
if not is_benchmark:
print(f"Inference time: {infer_time*1000:.2f} ms")
logits = sigmoid(outputs[0][0])
boxes = outputs[1][0]
max_values = np.max(logits, axis=1)
mask = max_values > box_threshold
logits = logits[mask]
boxes = boxes[mask]
phrases = ["object"] * len(boxes)
return boxes, np.max(logits, axis=1), phrases
# =========================
# Benchmark
# =========================
def benchmark(model, tokenizer, image, caption, box_th, text_th, warmup=5, runs=10):
print("\n🔥 预热")
for _ in range(warmup):
predict(model, tokenizer, image, caption, box_th, text_th, True)
print("\n🚀 测试")
times = []
for i in range(runs):
start = time.time()
predict(model, tokenizer, image, caption, box_th, text_th, True)
times.append(time.time() - start)
print(f"\n平均耗时: {np.mean(times)*1000:.2f} ms")
print(f"FPS: {1/np.mean(times):.2f}")
# =========================
# 主函数
# =========================
if __name__ == "__main__":
#model_path = "weights/ground.onnx"
model_path = "weights/ground_fixed.onnx"
cache_path = "weights/ground_xiongke.mxr" # ⭐ 缓存文件
img_path = "images/in/car_1.jpg"
TEXT_PROMPT = "car ."
BOX_TRESHOLD = 0.35
TEXT_TRESHOLD = 0.25
# 🚀 加载模型(自动缓存)
model = MIGraphXModel(
model_path,
cache_path=cache_path,
force_recompile=False # 改成 True 可强制重编译
)
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
image_source, image = load_image(img_path)
benchmark(model, tokenizer, image, TEXT_PROMPT, BOX_TRESHOLD, TEXT_TRESHOLD)
boxes, confs, phrases = predict(
model, tokenizer, image,
TEXT_PROMPT, BOX_TRESHOLD, TEXT_TRESHOLD
)
print("检测结果:", phrases)
from typing import Tuple, List, Dict
import cv2
import numpy as np
import torch
import onnxruntime as ort
from transformers import BertTokenizer, AutoTokenizer
import bisect
import time
from groundingdino.util.inference import load_image
from groundingdino.models.GroundingDINO.bertwarper import generate_masks_with_special_tokens_and_transfer_map
# 加入推理延迟等指标
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def get_phrases_from_posmap(
posmap: np.ndarray, tokenized: Dict, tokenizer: AutoTokenizer, left_idx: int = 0, right_idx: int = 255
):
assert isinstance(posmap, np.ndarray), "posmap must be np.ndarray"
if posmap.ndim == 1:
# 将指定范围内的元素设为 False
posmap[:left_idx + 1] = False
posmap[right_idx:] = False
# 获取非零元素的索引
non_zero_idx = np.nonzero(posmap)[0]
token_ids = [tokenized["input_ids"][i] for i in non_zero_idx]
return tokenizer.decode(token_ids)
else:
raise NotImplementedError("posmap must be 1-dim")
def preprocess_caption(caption: str) -> str:
result = caption.lower().strip()
if result.endswith("."):
return result
return result + "."
# 核心优化:增加tokenizer参数,从外部传入
def predict(
ort_session,
tokenizer: AutoTokenizer, # 外部预加载的tokenizer
image: np.array,
caption: str,
box_threshold: float,
text_threshold: float,
device: str = "cpu",
remove_combined: bool = False,
is_benchmark: bool = False # 新增:标记是否为基准测试(控制日志输出)
) -> Tuple[torch.Tensor, torch.Tensor, List[str]]:
# 1. 文本预处理
t0 = time.time()
caption = preprocess_caption(caption=caption)
if not is_benchmark:
print(f"Caption processing took {(time.time() - t0):.3f}s")
captions = [caption]
# 3. 编码文本
t0 = time.time()
# 移除重复加载tokenizer的性能黑洞
tokenized = tokenizer(captions, padding="longest", return_tensors="pt").to(device)
specical_tokens = tokenizer.convert_tokens_to_ids (["[CLS]", "[SEP]", ".", "?"])
if not is_benchmark:
print(f"Word embedding took {(time.time() - t0):.3f}s")
# 4. 生成注意力掩码和位置信息
t0 = time.time()
(
text_self_attention_masks,
position_ids,
cate_to_token_mask_list,
) = generate_masks_with_special_tokens_and_transfer_map(
tokenized, specical_tokens, tokenizer)
if not is_benchmark:
print(f"Generate attention masks took {(time.time() - t0):.3f}s")
# 5. 处理超长文本
max_text_len = 256
if text_self_attention_masks.shape[1] > max_text_len:
text_self_attention_masks = text_self_attention_masks[
:, : max_text_len, : max_text_len]
position_ids = position_ids[:, : max_text_len]
tokenized["input_ids"] = tokenized["input_ids"][:, : max_text_len]
tokenized["attention_mask"] = tokenized["attention_mask"][:, : max_text_len]
tokenized["token_type_ids"] = tokenized["token_type_ids"][:, : max_text_len]
# 6. 执行模型推理
# attention_mask: True=可见 → False=mask
attention_mask = tokenized["attention_mask"].float()
attention_mask = (1 - attention_mask) * -1e9 # 关键!
attention_mask = np.asarray(attention_mask)
# text_token_mask 同理(如果参与 attention)
text_self_attention_masks = text_self_attention_masks.float()
text_self_attention_masks = (1 - text_self_attention_masks) * -1e9
input_dict = {
"img": np.expand_dims(np.asarray(image), axis=0),
"input_ids": np.asarray(tokenized["input_ids"]),
"attention_mask": attention_mask,
"position_ids": np.asarray(position_ids),
"token_type_ids": np.asarray(tokenized["token_type_ids"]),
"text_token_mask": np.asarray(text_self_attention_masks)
}
# input_dict = {
# "img": np.expand_dims(np.asarray(image), axis=0),
# "input_ids": np.asarray(tokenized["input_ids"]),
# "attention_mask": attention_mask,
# "position_ids": np.asarray(position_ids),
# "token_type_ids": np.asarray(tokenized["token_type_ids"]),
# "text_token_mask": np.asarray(text_self_attention_masks)
# }
t0 = time.time()
outputs = ort_session.run(['logits', 'boxes'], input_dict)
infer_time = time.time() - t0
if not is_benchmark:
print(f"Inference time: {infer_time:.3f}s")
# 7. 获取预测结果
prediction_logits = np.apply_along_axis(sigmoid, -1, outputs[0][0])
prediction_boxes = outputs[1][0]
if not is_benchmark:
print(f"\n=== Debug Info ===")
print(f"Prediction logits shape: {prediction_logits.shape}")
print(f"Prediction boxes shape: {prediction_boxes.shape}")
print(f"Max logit value: {np.max(prediction_logits):.4f}")
print(f"Mean logit value: {np.mean(prediction_logits):.4f}")
# 8. 应用过滤条件
max_values = np.max(prediction_logits, axis=1)
mask = max_values > box_threshold
logits = prediction_logits[mask]
boxes = prediction_boxes[mask]
# 9. 处理文本匹配
tokenized = tokenizer(caption)
# 10. 处理特殊标记
if remove_combined:
sep_idx = [i for i in range(len(tokenized['input_ids'])) if tokenized['input_ids'][i] in [101, 102, 1012]]
phrases = []
for logit in logits:
max_idx = logit.argmax()
insert_idx = bisect.bisect_left(sep_idx, max_idx)
right_idx = sep_idx[insert_idx]
left_idx = sep_idx[insert_idx - 1]
phrases.append(
get_phrases_from_posmap(logit > text_threshold, tokenized, tokenizer, left_idx, right_idx).replace('.', '')
)
else:
phrases = [
get_phrases_from_posmap(logit > text_threshold, tokenized, tokenizer).replace('.', '')
for logit in logits
]
return boxes, np.max(logits, axis=1), phrases
# 新增:完整的性能测试函数(包含预热+实际推理)
def benchmark_performance(
ort_session, tokenizer, image, caption, box_threshold, text_threshold,
warmup_runs=5, test_runs=10, device="cpu"
):
"""
性能测试函数:包含预热和实际推理
:param warmup_runs: 预热次数
:param test_runs: 实际测试次数
"""
print("="*60)
print("📊 开始性能测试(包含预热+实际推理)")
print("="*60)
# 1. 预热阶段
print(f"\n🔥 预热阶段({warmup_runs} 次)- 不计入性能统计")
warmup_start = time.time()
for i in range(warmup_runs):
t0 = time.time()
predict(ort_session, tokenizer, image, caption, box_threshold, text_threshold, device, is_benchmark=True)
warmup_time = time.time() - t0
print(f"预热 {i+1}/{warmup_runs} - 耗时: {warmup_time*1000:.2f} ms")
total_warmup_time = time.time() - warmup_start
print(f"\n预热完成 - 总耗时: {total_warmup_time:.3f} s, 平均每次: {total_warmup_time/warmup_runs*1000:.2f} ms")
# 2. 实际推理测试阶段
print(f"\n🚀 实际推理测试阶段({test_runs} 次)- 统计性能指标")
test_start = time.time()
infer_times = [] # 记录每次推理耗时
for i in range(test_runs):
t0 = time.time()
predict(ort_session, tokenizer, image, caption, box_threshold, text_threshold, device, is_benchmark=True)
infer_time = time.time() - t0
infer_times.append(infer_time)
print(f"实际推理 {i+1}/{test_runs} - 耗时: {infer_time*1000:.2f} ms")
# 3. 计算性能指标
total_test_time = time.time() - test_start
avg_infer_time = np.mean(infer_times)
std_infer_time = np.std(infer_times)
max_infer_time = np.max(infer_times)
min_infer_time = np.min(infer_times)
fps = test_runs / total_test_time
# 4. 输出性能报告
print("\n" + "="*60)
print("📈 性能测试报告(仅实际推理阶段)")
print("="*60)
print(f"测试次数: {test_runs} 次")
print(f"总推理耗时: {total_test_time:.3f} s")
print(f"平均推理耗时: {avg_infer_time*1000:.2f} ms (±{std_infer_time*1000:.2f} ms)")
print(f"最大推理耗时: {max_infer_time*1000:.2f} ms")
print(f"最小推理耗时: {min_infer_time*1000:.2f} ms")
print(f"平均FPS: {fps:.2f} 帧/秒")
print("="*60)
return {
"warmup_runs": warmup_runs,
"test_runs": test_runs,
"avg_infer_time_ms": avg_infer_time*1000,
"std_infer_time_ms": std_infer_time*1000,
"max_infer_time_ms": max_infer_time*1000,
"min_infer_time_ms": min_infer_time*1000,
"fps": fps
}
if __name__ == '__main__':
# 配置参数
model_path = 'weights/ground_test.onnx'
img_path = 'images/in/car_1.jpg'
TEXT_PROMPT = "car ."
BOX_TRESHOLD = 0.35
TEXT_TRESHOLD = 0.25
DEVICE = "cpu"
WARMUP_RUNS = 5 # 预热次数
TEST_RUNS = 10 # 实际测试次数
# 加载图片
image_source, image = load_image(img_path)
# 加载ONNX模型(启用优化)
print("🔍 加载ONNX模型")
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL # 启用所有图优化
sess_options.log_severity_level = 3 # 减少日志输出
# sess_options.enable_profiling = True # 启用性能分析
ort_session = ort.InferenceSession(model_path,
sess_options=sess_options,
providers=['ROCMExecutionProvider']
# provider_options=[{
# "device_id": 0,
# "migraphx_fp16_enable": "False",
# "migraphx_int8_enable": "False",
# # 尝试禁用 MIGraphX 内部优化
# "migraphx_save_compiled_model": "False",
# }]
)
# 查看当前执行引擎
current_provider = ort_session.get_providers()
print(f"✅ 模型加载完成 - 当前执行引擎: {current_provider}")
# 预加载tokenizer(只加载一次,核心优化)
print("\n📝 预加载BERT Tokenizer(仅加载一次)")
t0 = time.time()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
print(f"✅ Tokenizer加载完成 - 耗时: {(time.time() - t0):.3f} s")
# 第一步:运行完整的性能测试(预热+实际推理)
performance_result = benchmark_performance(
ort_session, tokenizer, image, TEXT_PROMPT,
BOX_TRESHOLD, TEXT_TRESHOLD,
WARMUP_RUNS, TEST_RUNS, DEVICE
)
# 第二步:执行一次完整推理(带详细日志,保存结果图片)
print("\n" + "="*60)
print("🎯 执行最终推理(带详细日志+保存结果)")
print("="*60)
boxes, confs, phrases = predict(
ort_session, tokenizer, image, TEXT_PROMPT,
BOX_TRESHOLD, TEXT_TRESHOLD, DEVICE
)
# 绘制并保存结果图片
ori_img = cv2.imread(img_path)
img_h = ori_img.shape[0]
img_w = ori_img.shape[1]
for i in range(len(boxes)):
one_box = boxes[i]
one_conf = confs[i]
one_cls = phrases[i]
x1 = int((one_box[0] - one_box[2] / 2) * img_w)
y1 = int((one_box[1] - one_box[3] / 2) * img_h)
x2 = int((one_box[0] + one_box[2] / 2) * img_w)
y2 = int((one_box[1] + one_box[3] / 2) * img_h)
cv2.rectangle(ori_img, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(
ori_img, f'{one_cls} {one_conf:.2f}',
(x1-15, y1-15),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
color=(255, 255, 255),
fontScale=1.5,
thickness=3
)
# 保存结果
cv2.imwrite('./images/out/result.jpg', ori_img)
print(f"\n✅ 结果已保存至: ./images/out/result.jpg")
print(f"✅ 检测到目标: {phrases} (共 {len(boxes)} 个)")
# profile_file = ort_session.end_profiling()
# print(f"\n📊 Profiling 文件已生成: {profile_file}")
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment