Commit 7e0b0e35 authored by Your Name's avatar Your Name
Browse files

提交RetinaFace推理示例

parent 5ba3127f
Pipeline #282 failed with stages
in 0 seconds
# RetinaFace_MIGraphX # RetinaFace
构建RetinaFace推理示例 ## 模型介绍
\ No newline at end of file
RetinaFace是一个经典的人脸检测模型(https://arxiv.org/abs/1905.00641),采用了SSD架构。
## 模型结构
RetinaFace模型 有几个主要特点:
- 采用 FPN 特征金字塔提取多尺度特征;
- 引入 SSH 算法的 Context Modeling;
- 多任务训练,提供额外的监督信息。
## 推理
### 环境配置
[光源](https://www.sourcefind.cn/#/image/dcu/custom)可拉取用于推理的docker镜像,RetinaFace 模型推理推荐的镜像如下:
```
docker pull image.sourcefind.cn:5000/dcu/admin/base/custom:ort_dcu_1.14.0_migraphx2.5.2_dtk22.10.1
```
[光合开发者社区](https://cancon.hpccube.com:65024/4/main/)可下载MIGraphX安装包,python依赖安装:
```
pip install -r requirements.txt
```
安装DTK版的Pytorch和torchvision,下载地址:https://cancon.hpccube.com:65024/4/main/pytorch,https://cancon.hpccube.com:65024/4/main/vision
### 运行示例
RetinaFace模型的推理示例程序是RetinaFace_infer_migraphx.py,使用如下命令运行该推理示例:
```
python RetinaFace_infer_migraphx.py
```
程序运行结束会在当前目录生成RetinaFace检测结果图像。
<img src="./curve/Result.jpg" alt="Result" style="zoom: 50%;" />
## 历史版本
​ https://developer.hpccube.com/codes/modelzoo/retinaface_migraphx
## 参考
​ https://github.com/biubug6/Pytorch_Retinaface
from __future__ import print_function
import os
import argparse
import torch
import torch.backends.cudnn as cudnn
import numpy as np
from data import cfg_mnet, cfg_re50
from layers.functions.prior_box import PriorBox
from utils.nms.py_cpu_nms import py_cpu_nms
import cv2
from utils.box_utils import decode, decode_landm
import time
import migraphx
parser = argparse.ArgumentParser(description='Retinaface')
parser.add_argument('-m', '--trained_model', default='./weights/mobilenet0.25_Final.pth',
type=str, help='Trained state_dict file path to open')
parser.add_argument('--network', default='mobile0.25', help='Backbone network mobile0.25 or resnet50')
parser.add_argument('--cpu', action="store_true", default=False, help='Use cpu inference')
parser.add_argument('--confidence_threshold', default=0.85, type=float, help='confidence_threshold')
parser.add_argument('--top_k', default=5000, type=int, help='top_k')
parser.add_argument('--nms_threshold', default=0.4, type=float, help='nms_threshold')
parser.add_argument('--keep_top_k', default=750, type=int, help='keep_top_k')
parser.add_argument('-s', '--save_image', action="store_true", default=True, help='show detection results')
parser.add_argument('--vis_thres', default=0.6, type=float, help='visualization_threshold')
args = parser.parse_args()
def migraphx_run(model,cpu,data_tensor):
# 将输入的tensor数据转换为numpy
if cpu:
data_numpy=data_tensor.cpu().numpy()
device = torch.device("cpu")
else:
data_numpy=data_tensor.detach().cpu().numpy()
device = torch.device("cuda")
img_data = np.zeros(data_numpy.shape).astype("float32")
for i in range(data_numpy.shape[0]):
img_data[i, :, :, :] = data_numpy[i, :, :, :]
# 执行推理
result = model.run({model.get_parameter_names()[0]: migraphx.argument(img_data)})
# 将结果转换为tensor
result0=torch.from_numpy(np.array(result[0], copy=False)).to(device)
result1=torch.from_numpy(np.array(result[1], copy=False)).to(device)
result2=torch.from_numpy(np.array(result[2], copy=False)).to(device)
return (result0,result1,result2)
if __name__ == '__main__':
# 加载模型
cfg = None
if args.network == "mobile0.25":
cfg = cfg_mnet
elif args.network == "resnet50":
cfg = cfg_re50
device = torch.device("cpu" if args.cpu else "cuda")
model = migraphx.parse_onnx("./FaceDetector.onnx")
inputName=model.get_parameter_names()[0]
inputShape=model.get_parameter_shapes()[inputName].lens()
print("inputName:{0} \ninputShape:{1}".format(inputName,inputShape))
# FP16
# migraphx.quantize_fp16(model)
# 编译
model.compile(t=migraphx.get_target("gpu"),device_id=0) # device_id: 设置GPU设备,默认为0号设备
resize = 1
# testing begin
for i in range(100):
# resize到onnx模型输入大小
image_path = "./curve/test.jpg"
img_raw = cv2.imread(image_path, cv2.IMREAD_COLOR)
img_raw = cv2.resize(img_raw, (640,640))
img = np.float32(img_raw)
im_height, im_width, _ = img.shape
scale = torch.Tensor([img.shape[1], img.shape[0], img.shape[1], img.shape[0]])
img -= (104, 117, 123)
img = img.transpose(2, 0, 1)
img = torch.from_numpy(img).unsqueeze(0)
img = img.to(device)
scale = scale.to(device)
tic = time.time()
loc, conf, landms = migraphx_run(model,args.cpu,img) # forward pass
print('net forward time: {:.4f}'.format(time.time() - tic))
priorbox = PriorBox(cfg, image_size=(im_height, im_width))
priors = priorbox.forward()
priors = priors.to(device)
prior_data = priors.data
boxes = decode(loc.data.squeeze(0), prior_data, cfg['variance'])
boxes = boxes * scale / resize
boxes = boxes.cpu().numpy()
scores = conf.squeeze(0).data.cpu().numpy()[:, 1]
landms = decode_landm(landms.data.squeeze(0), prior_data, cfg['variance'])
scale1 = torch.Tensor([img.shape[3], img.shape[2], img.shape[3], img.shape[2],
img.shape[3], img.shape[2], img.shape[3], img.shape[2],
img.shape[3], img.shape[2]])
scale1 = scale1.to(device)
landms = landms * scale1 / resize
landms = landms.cpu().numpy()
# ignore low scores
inds = np.where(scores > args.confidence_threshold)[0]
boxes = boxes[inds]
landms = landms[inds]
scores = scores[inds]
# keep top-K before NMS
order = scores.argsort()[::-1][:args.top_k]
boxes = boxes[order]
landms = landms[order]
scores = scores[order]
# do NMS
dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32, copy=False)
keep = py_cpu_nms(dets, args.nms_threshold)
# keep = nms(dets, args.nms_threshold,force_cpu=args.cpu)
dets = dets[keep, :]
landms = landms[keep]
# keep top-K faster NMS
dets = dets[:args.keep_top_k, :]
landms = landms[:args.keep_top_k, :]
dets = np.concatenate((dets, landms), axis=1)
# show image
if args.save_image:
for b in dets:
if b[4] < args.vis_thres:
continue
text = "{:.4f}".format(b[4])
b = list(map(int, b))
cv2.rectangle(img_raw, (b[0], b[1]), (b[2], b[3]), (0, 0, 255), 2)
cx = b[0]
cy = b[1] + 12
cv2.putText(img_raw, text, (cx, cy),
cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255))
# landms
cv2.circle(img_raw, (b[5], b[6]), 1, (0, 0, 255), 4)
cv2.circle(img_raw, (b[7], b[8]), 1, (0, 255, 255), 4)
cv2.circle(img_raw, (b[9], b[10]), 1, (255, 0, 255), 4)
cv2.circle(img_raw, (b[11], b[12]), 1, (0, 255, 0), 4)
cv2.circle(img_raw, (b[13], b[14]), 1, (255, 0, 0), 4)
# save image
name = "test.jpg"
cv2.imwrite(name, img_raw)
from __future__ import print_function
import os
import argparse
import torch
import torch.backends.cudnn as cudnn
import numpy as np
from data import cfg_mnet, cfg_re50
from layers.functions.prior_box import PriorBox
from utils.nms.py_cpu_nms import py_cpu_nms
import cv2
from models.retinaface import RetinaFace
from utils.box_utils import decode, decode_landm
from utils.timer import Timer
parser = argparse.ArgumentParser(description='Test')
parser.add_argument('-m', '--trained_model', default='./weights/mobilenet0.25_Final.pth',
type=str, help='Trained state_dict file path to open')
parser.add_argument('--network', default='mobile0.25', help='Backbone network mobile0.25 or resnet50')
parser.add_argument('--long_side', default=640, help='when origin_size is false, long_side is scaled size(320 or 640 for long side)')
parser.add_argument('--cpu', action="store_true", default=True, help='Use cpu inference')
args = parser.parse_args()
def check_keys(model, pretrained_state_dict):
ckpt_keys = set(pretrained_state_dict.keys())
model_keys = set(model.state_dict().keys())
used_pretrained_keys = model_keys & ckpt_keys
unused_pretrained_keys = ckpt_keys - model_keys
missing_keys = model_keys - ckpt_keys
print('Missing keys:{}'.format(len(missing_keys)))
print('Unused checkpoint keys:{}'.format(len(unused_pretrained_keys)))
print('Used keys:{}'.format(len(used_pretrained_keys)))
assert len(used_pretrained_keys) > 0, 'load NONE from pretrained checkpoint'
return True
def remove_prefix(state_dict, prefix):
''' Old style model is stored with all names of parameters sharing common prefix 'module.' '''
print('remove prefix \'{}\''.format(prefix))
f = lambda x: x.split(prefix, 1)[-1] if x.startswith(prefix) else x
return {f(key): value for key, value in state_dict.items()}
def load_model(model, pretrained_path, load_to_cpu):
print('Loading pretrained model from {}'.format(pretrained_path))
if load_to_cpu:
pretrained_dict = torch.load(pretrained_path, map_location=lambda storage, loc: storage)
else:
device = torch.cuda.current_device()
pretrained_dict = torch.load(pretrained_path, map_location=lambda storage, loc: storage.cuda(device))
if "state_dict" in pretrained_dict.keys():
pretrained_dict = remove_prefix(pretrained_dict['state_dict'], 'module.')
else:
pretrained_dict = remove_prefix(pretrained_dict, 'module.')
check_keys(model, pretrained_dict)
model.load_state_dict(pretrained_dict, strict=False)
return model
if __name__ == '__main__':
torch.set_grad_enabled(False)
cfg = None
if args.network == "mobile0.25":
cfg = cfg_mnet
elif args.network == "resnet50":
cfg = cfg_re50
# net and model
net = RetinaFace(cfg=cfg, phase = 'test')
net = load_model(net, args.trained_model, args.cpu)
net.eval()
print('Finished loading model!')
print(net)
device = torch.device("cpu" if args.cpu else "cuda")
net = net.to(device)
# ------------------------ export -----------------------------
output_onnx = 'FaceDetector.onnx'
print("==> Exporting model to ONNX format at '{}'".format(output_onnx))
input_names = ["input0"]
output_names = ["output0"]
inputs = torch.randn(1, 3, args.long_side, args.long_side).to(device)
torch_out = torch.onnx._export(net, inputs, output_onnx, export_params=True, verbose=False,
input_names=input_names, output_names=output_names)
This diff is collapsed.
from .wider_face import WiderFaceDetection, detection_collate
from .data_augment import *
from .config import *
# config.py
cfg_mnet = {
'name': 'mobilenet0.25',
'min_sizes': [[16, 32], [64, 128], [256, 512]],
'steps': [8, 16, 32],
'variance': [0.1, 0.2],
'clip': False,
'loc_weight': 2.0,
'gpu_train': True,
'batch_size': 32,
'ngpu': 1,
'epoch': 250,
'decay1': 190,
'decay2': 220,
'image_size': 640,
'pretrain': False,
'return_layers': {'stage1': 1, 'stage2': 2, 'stage3': 3},
'in_channel': 32,
'out_channel': 64
}
cfg_re50 = {
'name': 'Resnet50',
'min_sizes': [[16, 32], [64, 128], [256, 512]],
'steps': [8, 16, 32],
'variance': [0.1, 0.2],
'clip': False,
'loc_weight': 2.0,
'gpu_train': True,
'batch_size': 24,
'ngpu': 4,
'epoch': 100,
'decay1': 70,
'decay2': 90,
'image_size': 840,
'pretrain': True,
'return_layers': {'layer2': 1, 'layer3': 2, 'layer4': 3},
'in_channel': 256,
'out_channel': 256
}
import cv2
import numpy as np
import random
from utils.box_utils import matrix_iof
def _crop(image, boxes, labels, landm, img_dim):
height, width, _ = image.shape
pad_image_flag = True
for _ in range(250):
"""
if random.uniform(0, 1) <= 0.2:
scale = 1.0
else:
scale = random.uniform(0.3, 1.0)
"""
PRE_SCALES = [0.3, 0.45, 0.6, 0.8, 1.0]
scale = random.choice(PRE_SCALES)
short_side = min(width, height)
w = int(scale * short_side)
h = w
if width == w:
l = 0
else:
l = random.randrange(width - w)
if height == h:
t = 0
else:
t = random.randrange(height - h)
roi = np.array((l, t, l + w, t + h))
value = matrix_iof(boxes, roi[np.newaxis])
flag = (value >= 1)
if not flag.any():
continue
centers = (boxes[:, :2] + boxes[:, 2:]) / 2
mask_a = np.logical_and(roi[:2] < centers, centers < roi[2:]).all(axis=1)
boxes_t = boxes[mask_a].copy()
labels_t = labels[mask_a].copy()
landms_t = landm[mask_a].copy()
landms_t = landms_t.reshape([-1, 5, 2])
if boxes_t.shape[0] == 0:
continue
image_t = image[roi[1]:roi[3], roi[0]:roi[2]]
boxes_t[:, :2] = np.maximum(boxes_t[:, :2], roi[:2])
boxes_t[:, :2] -= roi[:2]
boxes_t[:, 2:] = np.minimum(boxes_t[:, 2:], roi[2:])
boxes_t[:, 2:] -= roi[:2]
# landm
landms_t[:, :, :2] = landms_t[:, :, :2] - roi[:2]
landms_t[:, :, :2] = np.maximum(landms_t[:, :, :2], np.array([0, 0]))
landms_t[:, :, :2] = np.minimum(landms_t[:, :, :2], roi[2:] - roi[:2])
landms_t = landms_t.reshape([-1, 10])
# make sure that the cropped image contains at least one face > 16 pixel at training image scale
b_w_t = (boxes_t[:, 2] - boxes_t[:, 0] + 1) / w * img_dim
b_h_t = (boxes_t[:, 3] - boxes_t[:, 1] + 1) / h * img_dim
mask_b = np.minimum(b_w_t, b_h_t) > 0.0
boxes_t = boxes_t[mask_b]
labels_t = labels_t[mask_b]
landms_t = landms_t[mask_b]
if boxes_t.shape[0] == 0:
continue
pad_image_flag = False
return image_t, boxes_t, labels_t, landms_t, pad_image_flag
return image, boxes, labels, landm, pad_image_flag
def _distort(image):
def _convert(image, alpha=1, beta=0):
tmp = image.astype(float) * alpha + beta
tmp[tmp < 0] = 0
tmp[tmp > 255] = 255
image[:] = tmp
image = image.copy()
if random.randrange(2):
#brightness distortion
if random.randrange(2):
_convert(image, beta=random.uniform(-32, 32))
#contrast distortion
if random.randrange(2):
_convert(image, alpha=random.uniform(0.5, 1.5))
image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
#saturation distortion
if random.randrange(2):
_convert(image[:, :, 1], alpha=random.uniform(0.5, 1.5))
#hue distortion
if random.randrange(2):
tmp = image[:, :, 0].astype(int) + random.randint(-18, 18)
tmp %= 180
image[:, :, 0] = tmp
image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR)
else:
#brightness distortion
if random.randrange(2):
_convert(image, beta=random.uniform(-32, 32))
image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
#saturation distortion
if random.randrange(2):
_convert(image[:, :, 1], alpha=random.uniform(0.5, 1.5))
#hue distortion
if random.randrange(2):
tmp = image[:, :, 0].astype(int) + random.randint(-18, 18)
tmp %= 180
image[:, :, 0] = tmp
image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR)
#contrast distortion
if random.randrange(2):
_convert(image, alpha=random.uniform(0.5, 1.5))
return image
def _expand(image, boxes, fill, p):
if random.randrange(2):
return image, boxes
height, width, depth = image.shape
scale = random.uniform(1, p)
w = int(scale * width)
h = int(scale * height)
left = random.randint(0, w - width)
top = random.randint(0, h - height)
boxes_t = boxes.copy()
boxes_t[:, :2] += (left, top)
boxes_t[:, 2:] += (left, top)
expand_image = np.empty(
(h, w, depth),
dtype=image.dtype)
expand_image[:, :] = fill
expand_image[top:top + height, left:left + width] = image
image = expand_image
return image, boxes_t
def _mirror(image, boxes, landms):
_, width, _ = image.shape
if random.randrange(2):
image = image[:, ::-1]
boxes = boxes.copy()
boxes[:, 0::2] = width - boxes[:, 2::-2]
# landm
landms = landms.copy()
landms = landms.reshape([-1, 5, 2])
landms[:, :, 0] = width - landms[:, :, 0]
tmp = landms[:, 1, :].copy()
landms[:, 1, :] = landms[:, 0, :]
landms[:, 0, :] = tmp
tmp1 = landms[:, 4, :].copy()
landms[:, 4, :] = landms[:, 3, :]
landms[:, 3, :] = tmp1
landms = landms.reshape([-1, 10])
return image, boxes, landms
def _pad_to_square(image, rgb_mean, pad_image_flag):
if not pad_image_flag:
return image
height, width, _ = image.shape
long_side = max(width, height)
image_t = np.empty((long_side, long_side, 3), dtype=image.dtype)
image_t[:, :] = rgb_mean
image_t[0:0 + height, 0:0 + width] = image
return image_t
def _resize_subtract_mean(image, insize, rgb_mean):
interp_methods = [cv2.INTER_LINEAR, cv2.INTER_CUBIC, cv2.INTER_AREA, cv2.INTER_NEAREST, cv2.INTER_LANCZOS4]
interp_method = interp_methods[random.randrange(5)]
image = cv2.resize(image, (insize, insize), interpolation=interp_method)
image = image.astype(np.float32)
image -= rgb_mean
return image.transpose(2, 0, 1)
class preproc(object):
def __init__(self, img_dim, rgb_means):
self.img_dim = img_dim
self.rgb_means = rgb_means
def __call__(self, image, targets):
assert targets.shape[0] > 0, "this image does not have gt"
boxes = targets[:, :4].copy()
labels = targets[:, -1].copy()
landm = targets[:, 4:-1].copy()
image_t, boxes_t, labels_t, landm_t, pad_image_flag = _crop(image, boxes, labels, landm, self.img_dim)
image_t = _distort(image_t)
image_t = _pad_to_square(image_t,self.rgb_means, pad_image_flag)
image_t, boxes_t, landm_t = _mirror(image_t, boxes_t, landm_t)
height, width, _ = image_t.shape
image_t = _resize_subtract_mean(image_t, self.img_dim, self.rgb_means)
boxes_t[:, 0::2] /= width
boxes_t[:, 1::2] /= height
landm_t[:, 0::2] /= width
landm_t[:, 1::2] /= height
labels_t = np.expand_dims(labels_t, 1)
targets_t = np.hstack((boxes_t, landm_t, labels_t))
return image_t, targets_t
import os
import os.path
import sys
import torch
import torch.utils.data as data
import cv2
import numpy as np
class WiderFaceDetection(data.Dataset):
def __init__(self, txt_path, preproc=None):
self.preproc = preproc
self.imgs_path = []
self.words = []
f = open(txt_path,'r')
lines = f.readlines()
isFirst = True
labels = []
for line in lines:
line = line.rstrip()
if line.startswith('#'):
if isFirst is True:
isFirst = False
else:
labels_copy = labels.copy()
self.words.append(labels_copy)
labels.clear()
path = line[2:]
path = txt_path.replace('label.txt','images/') + path
self.imgs_path.append(path)
else:
line = line.split(' ')
label = [float(x) for x in line]
labels.append(label)
self.words.append(labels)
def __len__(self):
return len(self.imgs_path)
def __getitem__(self, index):
img = cv2.imread(self.imgs_path[index])
height, width, _ = img.shape
labels = self.words[index]
annotations = np.zeros((0, 15))
if len(labels) == 0:
return annotations
for idx, label in enumerate(labels):
annotation = np.zeros((1, 15))
# bbox
annotation[0, 0] = label[0] # x1
annotation[0, 1] = label[1] # y1
annotation[0, 2] = label[0] + label[2] # x2
annotation[0, 3] = label[1] + label[3] # y2
# landmarks
annotation[0, 4] = label[4] # l0_x
annotation[0, 5] = label[5] # l0_y
annotation[0, 6] = label[7] # l1_x
annotation[0, 7] = label[8] # l1_y
annotation[0, 8] = label[10] # l2_x
annotation[0, 9] = label[11] # l2_y
annotation[0, 10] = label[13] # l3_x
annotation[0, 11] = label[14] # l3_y
annotation[0, 12] = label[16] # l4_x
annotation[0, 13] = label[17] # l4_y
if (annotation[0, 4]<0):
annotation[0, 14] = -1
else:
annotation[0, 14] = 1
annotations = np.append(annotations, annotation, axis=0)
target = np.array(annotations)
if self.preproc is not None:
img, target = self.preproc(img, target)
return torch.from_numpy(img), target
def detection_collate(batch):
"""Custom collate fn for dealing with batches of images that have a different
number of associated object annotations (bounding boxes).
Arguments:
batch: (tuple) A tuple of tensor images and lists of annotations
Return:
A tuple containing:
1) (tensor) batch of images stacked on their 0 dim
2) (list of tensors) annotations for a given image are stacked on 0 dim
"""
targets = []
imgs = []
for _, sample in enumerate(batch):
for _, tup in enumerate(sample):
if torch.is_tensor(tup):
imgs.append(tup)
elif isinstance(tup, type(np.empty(0))):
annos = torch.from_numpy(tup).float()
targets.append(annos)
return (torch.stack(imgs, 0), targets)
from .functions import *
from .modules import *
import torch
from itertools import product as product
import numpy as np
from math import ceil
class PriorBox(object):
def __init__(self, cfg, image_size=None, phase='train'):
super(PriorBox, self).__init__()
self.min_sizes = cfg['min_sizes']
self.steps = cfg['steps']
self.clip = cfg['clip']
self.image_size = image_size
self.feature_maps = [[ceil(self.image_size[0]/step), ceil(self.image_size[1]/step)] for step in self.steps]
self.name = "s"
def forward(self):
anchors = []
for k, f in enumerate(self.feature_maps):
min_sizes = self.min_sizes[k]
for i, j in product(range(f[0]), range(f[1])):
for min_size in min_sizes:
s_kx = min_size / self.image_size[1]
s_ky = min_size / self.image_size[0]
dense_cx = [x * self.steps[k] / self.image_size[1] for x in [j + 0.5]]
dense_cy = [y * self.steps[k] / self.image_size[0] for y in [i + 0.5]]
for cy, cx in product(dense_cy, dense_cx):
anchors += [cx, cy, s_kx, s_ky]
# back to torch land
output = torch.Tensor(anchors).view(-1, 4)
if self.clip:
output.clamp_(max=1, min=0)
return output
from .multibox_loss import MultiBoxLoss
__all__ = ['MultiBoxLoss']
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from utils.box_utils import match, log_sum_exp
from data import cfg_mnet
GPU = cfg_mnet['gpu_train']
class MultiBoxLoss(nn.Module):
"""SSD Weighted Loss Function
Compute Targets:
1) Produce Confidence Target Indices by matching ground truth boxes
with (default) 'priorboxes' that have jaccard index > threshold parameter
(default threshold: 0.5).
2) Produce localization target by 'encoding' variance into offsets of ground
truth boxes and their matched 'priorboxes'.
3) Hard negative mining to filter the excessive number of negative examples
that comes with using a large number of default bounding boxes.
(default negative:positive ratio 3:1)
Objective Loss:
L(x,c,l,g) = (Lconf(x, c) + αLloc(x,l,g)) / N
Where, Lconf is the CrossEntropy Loss and Lloc is the SmoothL1 Loss
weighted by α which is set to 1 by cross val.
Args:
c: class confidences,
l: predicted boxes,
g: ground truth boxes
N: number of matched default boxes
See: https://arxiv.org/pdf/1512.02325.pdf for more details.
"""
def __init__(self, num_classes, overlap_thresh, prior_for_matching, bkg_label, neg_mining, neg_pos, neg_overlap, encode_target):
super(MultiBoxLoss, self).__init__()
self.num_classes = num_classes
self.threshold = overlap_thresh
self.background_label = bkg_label
self.encode_target = encode_target
self.use_prior_for_matching = prior_for_matching
self.do_neg_mining = neg_mining
self.negpos_ratio = neg_pos
self.neg_overlap = neg_overlap
self.variance = [0.1, 0.2]
def forward(self, predictions, priors, targets):
"""Multibox Loss
Args:
predictions (tuple): A tuple containing loc preds, conf preds,
and prior boxes from SSD net.
conf shape: torch.size(batch_size,num_priors,num_classes)
loc shape: torch.size(batch_size,num_priors,4)
priors shape: torch.size(num_priors,4)
ground_truth (tensor): Ground truth boxes and labels for a batch,
shape: [batch_size,num_objs,5] (last idx is the label).
"""
loc_data, conf_data, landm_data = predictions
priors = priors
num = loc_data.size(0)
num_priors = (priors.size(0))
# match priors (default boxes) and ground truth boxes
loc_t = torch.Tensor(num, num_priors, 4)
landm_t = torch.Tensor(num, num_priors, 10)
conf_t = torch.LongTensor(num, num_priors)
for idx in range(num):
truths = targets[idx][:, :4].data
labels = targets[idx][:, -1].data
landms = targets[idx][:, 4:14].data
defaults = priors.data
match(self.threshold, truths, defaults, self.variance, labels, landms, loc_t, conf_t, landm_t, idx)
if GPU:
loc_t = loc_t.cuda()
conf_t = conf_t.cuda()
landm_t = landm_t.cuda()
zeros = torch.tensor(0).cuda()
# landm Loss (Smooth L1)
# Shape: [batch,num_priors,10]
pos1 = conf_t > zeros
num_pos_landm = pos1.long().sum(1, keepdim=True)
N1 = max(num_pos_landm.data.sum().float(), 1)
pos_idx1 = pos1.unsqueeze(pos1.dim()).expand_as(landm_data)
landm_p = landm_data[pos_idx1].view(-1, 10)
landm_t = landm_t[pos_idx1].view(-1, 10)
loss_landm = F.smooth_l1_loss(landm_p, landm_t, reduction='sum')
pos = conf_t != zeros
conf_t[pos] = 1
# Localization Loss (Smooth L1)
# Shape: [batch,num_priors,4]
pos_idx = pos.unsqueeze(pos.dim()).expand_as(loc_data)
loc_p = loc_data[pos_idx].view(-1, 4)
loc_t = loc_t[pos_idx].view(-1, 4)
loss_l = F.smooth_l1_loss(loc_p, loc_t, reduction='sum')
# Compute max conf across batch for hard negative mining
batch_conf = conf_data.view(-1, self.num_classes)
loss_c = log_sum_exp(batch_conf) - batch_conf.gather(1, conf_t.view(-1, 1))
# Hard Negative Mining
loss_c[pos.view(-1, 1)] = 0 # filter out pos boxes for now
loss_c = loss_c.view(num, -1)
_, loss_idx = loss_c.sort(1, descending=True)
_, idx_rank = loss_idx.sort(1)
num_pos = pos.long().sum(1, keepdim=True)
num_neg = torch.clamp(self.negpos_ratio*num_pos, max=pos.size(1)-1)
neg = idx_rank < num_neg.expand_as(idx_rank)
# Confidence Loss Including Positive and Negative Examples
pos_idx = pos.unsqueeze(2).expand_as(conf_data)
neg_idx = neg.unsqueeze(2).expand_as(conf_data)
conf_p = conf_data[(pos_idx+neg_idx).gt(0)].view(-1,self.num_classes)
targets_weighted = conf_t[(pos+neg).gt(0)]
loss_c = F.cross_entropy(conf_p, targets_weighted, reduction='sum')
# Sum of losses: L(x,c,l,g) = (Lconf(x, c) + αLloc(x,l,g)) / N
N = max(num_pos.data.sum().float(), 1)
loss_l /= N
loss_c /= N
loss_landm /= N1
return loss_l, loss_c, loss_landm
import time
import torch
import torch.nn as nn
import torchvision.models._utils as _utils
import torchvision.models as models
import torch.nn.functional as F
from torch.autograd import Variable
def conv_bn(inp, oup, stride = 1, leaky = 0):
return nn.Sequential(
nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
nn.BatchNorm2d(oup),
nn.LeakyReLU(negative_slope=leaky, inplace=True)
)
def conv_bn_no_relu(inp, oup, stride):
return nn.Sequential(
nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
nn.BatchNorm2d(oup),
)
def conv_bn1X1(inp, oup, stride, leaky=0):
return nn.Sequential(
nn.Conv2d(inp, oup, 1, stride, padding=0, bias=False),
nn.BatchNorm2d(oup),
nn.LeakyReLU(negative_slope=leaky, inplace=True)
)
def conv_dw(inp, oup, stride, leaky=0.1):
return nn.Sequential(
nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False),
nn.BatchNorm2d(inp),
nn.LeakyReLU(negative_slope= leaky,inplace=True),
nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
nn.LeakyReLU(negative_slope= leaky,inplace=True),
)
class SSH(nn.Module):
def __init__(self, in_channel, out_channel):
super(SSH, self).__init__()
assert out_channel % 4 == 0
leaky = 0
if (out_channel <= 64):
leaky = 0.1
self.conv3X3 = conv_bn_no_relu(in_channel, out_channel//2, stride=1)
self.conv5X5_1 = conv_bn(in_channel, out_channel//4, stride=1, leaky = leaky)
self.conv5X5_2 = conv_bn_no_relu(out_channel//4, out_channel//4, stride=1)
self.conv7X7_2 = conv_bn(out_channel//4, out_channel//4, stride=1, leaky = leaky)
self.conv7x7_3 = conv_bn_no_relu(out_channel//4, out_channel//4, stride=1)
def forward(self, input):
conv3X3 = self.conv3X3(input)
conv5X5_1 = self.conv5X5_1(input)
conv5X5 = self.conv5X5_2(conv5X5_1)
conv7X7_2 = self.conv7X7_2(conv5X5_1)
conv7X7 = self.conv7x7_3(conv7X7_2)
out = torch.cat([conv3X3, conv5X5, conv7X7], dim=1)
out = F.relu(out)
return out
class FPN(nn.Module):
def __init__(self,in_channels_list,out_channels):
super(FPN,self).__init__()
leaky = 0
if (out_channels <= 64):
leaky = 0.1
self.output1 = conv_bn1X1(in_channels_list[0], out_channels, stride = 1, leaky = leaky)
self.output2 = conv_bn1X1(in_channels_list[1], out_channels, stride = 1, leaky = leaky)
self.output3 = conv_bn1X1(in_channels_list[2], out_channels, stride = 1, leaky = leaky)
self.merge1 = conv_bn(out_channels, out_channels, leaky = leaky)
self.merge2 = conv_bn(out_channels, out_channels, leaky = leaky)
def forward(self, input):
# names = list(input.keys())
input = list(input.values())
output1 = self.output1(input[0])
output2 = self.output2(input[1])
output3 = self.output3(input[2])
up3 = F.interpolate(output3, size=[output2.size(2), output2.size(3)], mode="nearest")
output2 = output2 + up3
output2 = self.merge2(output2)
up2 = F.interpolate(output2, size=[output1.size(2), output1.size(3)], mode="nearest")
output1 = output1 + up2
output1 = self.merge1(output1)
out = [output1, output2, output3]
return out
class MobileNetV1(nn.Module):
def __init__(self):
super(MobileNetV1, self).__init__()
self.stage1 = nn.Sequential(
conv_bn(3, 8, 2, leaky = 0.1), # 3
conv_dw(8, 16, 1), # 7
conv_dw(16, 32, 2), # 11
conv_dw(32, 32, 1), # 19
conv_dw(32, 64, 2), # 27
conv_dw(64, 64, 1), # 43
)
self.stage2 = nn.Sequential(
conv_dw(64, 128, 2), # 43 + 16 = 59
conv_dw(128, 128, 1), # 59 + 32 = 91
conv_dw(128, 128, 1), # 91 + 32 = 123
conv_dw(128, 128, 1), # 123 + 32 = 155
conv_dw(128, 128, 1), # 155 + 32 = 187
conv_dw(128, 128, 1), # 187 + 32 = 219
)
self.stage3 = nn.Sequential(
conv_dw(128, 256, 2), # 219 +3 2 = 241
conv_dw(256, 256, 1), # 241 + 64 = 301
)
self.avg = nn.AdaptiveAvgPool2d((1,1))
self.fc = nn.Linear(256, 1000)
def forward(self, x):
x = self.stage1(x)
x = self.stage2(x)
x = self.stage3(x)
x = self.avg(x)
# x = self.model(x)
x = x.view(-1, 256)
x = self.fc(x)
return x
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment