Commit b952e97b authored by chenych's avatar chenych
Browse files

First Commit.

parents
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import cv2
import time
import torch
from external.nms import soft_nms
from models.decode import ddd_decode
from models.utils import flip_tensor
from utils.image import get_affine_transform
from utils.post_process import ddd_post_process
from utils.debugger import Debugger
from utils.ddd_utils import compute_box_3d, project_to_image, alpha2rot_y
from utils.ddd_utils import draw_box_3d, unproject_2d_to_3d
from progress.bar import Bar
from .base_detector import BaseDetector
import numpy as np
class DddDetector(BaseDetector):
def __init__(self, opt):
super(DddDetector, self).__init__(opt)
self.calib = np.array([[707.0493, 0, 604.0814, 45.75831],
[0, 707.0493, 180.5066, -0.3454157],
[0, 0, 1., 0.004981016]], dtype=np.float32)
def pre_process(self, image, scale, calib=None):
height, width = image.shape[0:2]
inp_height, inp_width = self.opt.input_h, self.opt.input_w
c = np.array([width / 2, height / 2], dtype=np.float32)
if self.opt.keep_res:
s = np.array([inp_width, inp_height], dtype=np.int32)
else:
s = np.array([width, height], dtype=np.int32)
trans_input = get_affine_transform(c, s, 0, [inp_width, inp_height])
resized_image = image # cv2.resize(image, (width, height))
inp_image = cv2.warpAffine(
resized_image, trans_input, (inp_width, inp_height),
flags=cv2.INTER_LINEAR)
inp_image = (inp_image.astype(np.float32) / 255.)
inp_image = (inp_image - self.mean) / self.std
images = inp_image.transpose(2, 0, 1)[np.newaxis, ...]
calib = np.array(calib, dtype=np.float32) if calib is not None \
else self.calib
images = torch.from_numpy(images)
meta = {'c': c, 's': s,
'out_height': inp_height // self.opt.down_ratio,
'out_width': inp_width // self.opt.down_ratio,
'calib': calib}
return images, meta
def process(self, images, return_time=False):
with torch.no_grad():
torch.cuda.synchronize()
output = self.model(images)[-1]
output['hm'] = output['hm'].sigmoid_()
output['dep'] = 1. / (output['dep'].sigmoid() + 1e-6) - 1.
wh = output['wh'] if self.opt.reg_bbox else None
reg = output['reg'] if self.opt.reg_offset else None
torch.cuda.synchronize()
forward_time = time.time()
dets = ddd_decode(output['hm'], output['rot'], output['dep'],
output['dim'], wh=wh, reg=reg, K=self.opt.K)
if return_time:
return output, dets, forward_time
else:
return output, dets
def post_process(self, dets, meta, scale=1):
dets = dets.detach().cpu().numpy()
detections = ddd_post_process(
dets.copy(), [meta['c']], [meta['s']], [meta['calib']], self.opt)
self.this_calib = meta['calib']
return detections[0]
def merge_outputs(self, detections):
results = detections[0]
for j in range(1, self.num_classes + 1):
if len(results[j] > 0):
keep_inds = (results[j][:, -1] > self.opt.peak_thresh)
results[j] = results[j][keep_inds]
return results
def debug(self, debugger, images, dets, output, scale=1):
dets = dets.detach().cpu().numpy()
img = images[0].detach().cpu().numpy().transpose(1, 2, 0)
img = ((img * self.std + self.mean) * 255).astype(np.uint8)
pred = debugger.gen_colormap(output['hm'][0].detach().cpu().numpy())
debugger.add_blend_img(img, pred, 'pred_hm')
debugger.add_ct_detection(
img, dets[0], show_box=self.opt.reg_bbox,
center_thresh=self.opt.vis_thresh, img_id='det_pred')
def show_results(self, debugger, image, results):
debugger.add_3d_detection(
image, results, self.this_calib,
center_thresh=self.opt.vis_thresh, img_id='add_pred')
debugger.add_bird_view(
results, center_thresh=self.opt.vis_thresh, img_id='bird_pred')
debugger.show_all_imgs(pause=self.pause)
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from .exdet import ExdetDetector
from .ddd import DddDetector
from .ctdet import CtdetDetector
from .multi_pose import MultiPoseDetector
detector_factory = {
'exdet': ExdetDetector,
'ddd': DddDetector,
'ctdet': CtdetDetector,
'multi_pose': MultiPoseDetector,
}
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import _init_paths
import os
import cv2
import time
import torch
from external.nms import soft_nms
from models.decode import exct_decode, agnex_ct_decode
from models.utils import flip_tensor
from utils.image import get_affine_transform, transform_preds
from utils.post_process import ctdet_post_process
from utils.debugger import Debugger
from progress.bar import Bar
from .base_detector import BaseDetector
import numpy as np
class ExdetDetector(BaseDetector):
def __init__(self, opt):
super(ExdetDetector, self).__init__(opt)
self.decode = agnex_ct_decode if opt.agnostic_ex else exct_decode
def process(self, images, return_time=False):
with torch.no_grad():
torch.cuda.synchronize()
output = self.model(images)[-1]
t_heat = output['hm_t'].sigmoid_()
l_heat = output['hm_l'].sigmoid_()
b_heat = output['hm_b'].sigmoid_()
r_heat = output['hm_r'].sigmoid_()
c_heat = output['hm_c'].sigmoid_()
torch.cuda.synchronize()
forward_time = time.time()
if self.opt.reg_offset:
dets = self.decode(t_heat, l_heat, b_heat, r_heat, c_heat,
output['reg_t'], output['reg_l'],
output['reg_b'], output['reg_r'],
K=self.opt.K,
scores_thresh=self.opt.scores_thresh,
center_thresh=self.opt.center_thresh,
aggr_weight=self.opt.aggr_weight)
else:
dets = self.decode(t_heat, l_heat, b_heat, r_heat, c_heat, K=self.opt.K,
scores_thresh=self.opt.scores_thresh,
center_thresh=self.opt.center_thresh,
aggr_weight=self.opt.aggr_weight)
if return_time:
return output, dets, forward_time
else:
return output, dets
def debug(self, debugger, images, dets, output, scale=1):
detection = dets.detach().cpu().numpy().copy()
detection[:, :, :4] *= self.opt.down_ratio
for i in range(1):
inp_height, inp_width = images.shape[2], images.shape[3]
pred_hm = np.zeros((inp_height, inp_width, 3), dtype=np.uint8)
img = images[i].detach().cpu().numpy().transpose(1, 2, 0)
img = ((img * self.std + self.mean) * 255).astype(np.uint8)
parts = ['t', 'l', 'b', 'r', 'c']
for p in parts:
tag = 'hm_{}'.format(p)
pred = debugger.gen_colormap(
output[tag][i].detach().cpu().numpy(), (inp_height, inp_width))
if p != 'c':
pred_hm = np.maximum(pred_hm, pred)
else:
debugger.add_blend_img(
img, pred, 'pred_{}_{:.1f}'.format(p, scale))
debugger.add_blend_img(img, pred_hm, 'pred_{:.1f}'.format(scale))
debugger.add_img(img, img_id='out_{:.1f}'.format(scale))
for k in range(len(detection[i])):
# print('detection', detection[i, k, 4], detection[i, k])
if detection[i, k, 4] > 0.01:
# print('detection', detection[i, k, 4], detection[i, k])
debugger.add_coco_bbox(detection[i, k, :4], detection[i, k, -1],
detection[i, k, 4],
img_id='out_{:.1f}'.format(scale))
def post_process(self, dets, meta, scale=1):
out_width, out_height = meta['out_width'], meta['out_height']
dets = dets.detach().cpu().numpy().reshape(2, -1, 14)
dets[1, :, [0, 2]] = out_width - dets[1, :, [2, 0]]
dets = dets.reshape(1, -1, 14)
dets[0, :, 0:2] = transform_preds(
dets[0, :, 0:2], meta['c'], meta['s'], (out_width, out_height))
dets[0, :, 2:4] = transform_preds(
dets[0, :, 2:4], meta['c'], meta['s'], (out_width, out_height))
dets[:, :, 0:4] /= scale
return dets[0]
def merge_outputs(self, detections):
detections = np.concatenate(
[detection for detection in detections], axis=0).astype(np.float32)
classes = detections[..., -1]
keep_inds = (detections[:, 4] > 0)
detections = detections[keep_inds]
classes = classes[keep_inds]
results = {}
for j in range(self.num_classes):
keep_inds = (classes == j)
results[j + 1] = detections[keep_inds][:, 0:7].astype(np.float32)
soft_nms(results[j + 1], Nt=0.5, method=2)
results[j + 1] = results[j + 1][:, 0:5]
scores = np.hstack([
results[j][:, -1]
for j in range(1, self.num_classes + 1)
])
if len(scores) > self.max_per_image:
kth = len(scores) - self.max_per_image
thresh = np.partition(scores, kth)[kth]
for j in range(1, self.num_classes + 1):
keep_inds = (results[j][:, -1] >= thresh)
results[j] = results[j][keep_inds]
return results
def show_results(self, debugger, image, results):
debugger.add_img(image, img_id='exdet')
for j in range(1, self.num_classes + 1):
for bbox in results[j]:
if bbox[4] > self.opt.vis_thresh:
debugger.add_coco_bbox(
bbox[:4], j - 1, bbox[4], img_id='exdet')
debugger.show_all_imgs(pause=self.pause)
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import cv2
import time
import torch
from external.nms import soft_nms_39
from models.decode import multi_pose_decode, centerface_decode
from models.utils import flip_tensor, flip_lr_off, flip_lr
from utils.image import get_affine_transform
from utils.post_process import multi_pose_post_process
from utils.debugger import Debugger
from progress.bar import Bar
from .base_detector import BaseDetector
import numpy as np
class MultiPoseDetector(BaseDetector):
def __init__(self, opt):
super(MultiPoseDetector, self).__init__(opt)
self.flip_idx = opt.flip_idx
def process(self, images, return_time=False):
with torch.no_grad():
torch.cuda.synchronize()
output = self.model(images)[-1]
output['hm'] = output['hm']
# if self.opt.hm_hp and not self.opt.mse_loss:
# output['hm_hp'] = output['hm_hp'].sigmoid_()
reg = output['hm_offset'] if self.opt.reg_offset else None
# hm_hp = output['hm_hp'] if self.opt.hm_hp else None
# hp_offset = output['hp_offset'] if self.opt.reg_hp_offset else None
torch.cuda.synchronize()
forward_time = time.time()
if self.opt.flip_test:
output['hm'] = (output['hm'][0:1] +
flip_tensor(output['hm'][1:2])) / 2
output['wh'] = (output['wh'][0:1] +
flip_tensor(output['wh'][1:2])) / 2
output['hps'] = (output['hps'][0:1] +
flip_lr_off(output['hps'][1:2], self.flip_idx)) / 2
hm_hp = (hm_hp[0:1] + flip_lr(hm_hp[1:2], self.flip_idx)) / 2 \
if hm_hp is not None else None
reg = reg[0:1] if reg is not None else None
hp_offset = hp_offset[0:1] if hp_offset is not None else None
dets = centerface_decode(
output['hm'], output['wh'], output['landmarks'],
reg=reg, K=self.opt.K)
if return_time:
return output, dets, forward_time
else:
return output, dets
def post_process(self, dets, meta, scale=1):
dets = dets.detach().cpu().numpy().reshape(1, -1, dets.shape[2])
dets = multi_pose_post_process(
dets.copy(), [meta['c']], [meta['s']],
meta['out_height'], meta['out_width'])
for j in range(1, self.num_classes + 1):
dets[0][j] = np.array(
dets[0][j], dtype=np.float32).reshape(-1, 15) # 关键点数+5=15
# import pdb; pdb.set_trace()
dets[0][j][:, :4] /= scale
dets[0][j][:, 5:] /= scale
return dets[0]
def merge_outputs(self, detections):
results = {}
results[1] = np.concatenate(
[detection[1] for detection in detections], axis=0).astype(np.float32)
if self.opt.nms or len(self.opt.test_scales) > 1:
soft_nms_39(results[1], Nt=0.5, method=2)
results[1] = results[1].tolist()
return results
def debug(self, debugger, images, dets, output, scale=1):
dets = dets.detach().cpu().numpy().copy()
dets[:, :, :4] *= self.opt.down_ratio
dets[:, :, 5:39] *= self.opt.down_ratio
img = images[0].detach().cpu().numpy().transpose(1, 2, 0)
img = np.clip(((
img * self.std + self.mean) * 255.), 0, 255).astype(np.uint8)
pred = debugger.gen_colormap(output['hm'][0].detach().cpu().numpy())
debugger.add_blend_img(img, pred, 'pred_hm')
if self.opt.hm_hp:
pred = debugger.gen_colormap_hp(
output['hm_hp'][0].detach().cpu().numpy())
debugger.add_blend_img(img, pred, 'pred_hmhp')
def show_results(self, debugger, image, results):
debugger.add_img(image, img_id='multi_pose')
for bbox in results[1]:
if bbox[4] > self.opt.vis_thresh:
debugger.add_coco_bbox(
bbox[:4], 0, bbox[4], img_id='multi_pose')
debugger.add_coco_hp(bbox[5:39], img_id='multi_pose')
debugger.show_all_imgs(pause=self.pause)
def return_results(self, debugger, image, results):
debugger.add_img(image, img_id='multi_pose')
for bbox in results[1]:
if bbox[4] > self.opt.vis_thresh:
debugger.add_coco_bbox(
bbox[:4], 0, bbox[4], img_id='multi_pose')
debugger.add_coco_hp(bbox[5:39], img_id='multi_pose')
return debugger.return_img(img_id='multi_pose')
all:
python setup.py build_ext --inplace
rm -rf build
This source diff could not be displayed because it is too large. You can view the blob instead.
# --------------------------------------------------------
# Fast R-CNN
# Copyright (c) 2015 Microsoft
# Licensed under The MIT License [see LICENSE for details]
# Written by Ross Girshick
# --------------------------------------------------------
# ----------------------------------------------------------
# Soft-NMS: Improving Object Detection With One Line of Code
# Copyright (c) University of Maryland, College Park
# Licensed under The MIT License [see LICENSE for details]
# Written by Navaneeth Bodla and Bharat Singh
# ----------------------------------------------------------
import numpy as np
cimport numpy as np
cdef inline np.float32_t max(np.float32_t a, np.float32_t b):
return a if a >= b else b
cdef inline np.float32_t min(np.float32_t a, np.float32_t b):
return a if a <= b else b
def nms(np.ndarray[np.float32_t, ndim=2] dets, np.float thresh):
cdef np.ndarray[np.float32_t, ndim=1] x1 = dets[:, 0]
cdef np.ndarray[np.float32_t, ndim=1] y1 = dets[:, 1]
cdef np.ndarray[np.float32_t, ndim=1] x2 = dets[:, 2]
cdef np.ndarray[np.float32_t, ndim=1] y2 = dets[:, 3]
cdef np.ndarray[np.float32_t, ndim=1] scores = dets[:, 4]
cdef np.ndarray[np.float32_t, ndim=1] areas = (x2 - x1 + 1) * (y2 - y1 + 1)
cdef np.ndarray[np.int_t, ndim=1] order = scores.argsort()[::-1]
cdef int ndets = dets.shape[0]
cdef np.ndarray[np.int_t, ndim=1] suppressed = \
np.zeros((ndets), dtype=np.int)
# nominal indices
cdef int _i, _j
# sorted indices
cdef int i, j
# temp variables for box i's (the box currently under consideration)
cdef np.float32_t ix1, iy1, ix2, iy2, iarea
# variables for computing overlap with box j (lower scoring box)
cdef np.float32_t xx1, yy1, xx2, yy2
cdef np.float32_t w, h
cdef np.float32_t inter, ovr
keep = []
for _i in range(ndets):
i = order[_i]
if suppressed[i] == 1:
continue
keep.append(i)
ix1 = x1[i]
iy1 = y1[i]
ix2 = x2[i]
iy2 = y2[i]
iarea = areas[i]
for _j in range(_i + 1, ndets):
j = order[_j]
if suppressed[j] == 1:
continue
xx1 = max(ix1, x1[j])
yy1 = max(iy1, y1[j])
xx2 = min(ix2, x2[j])
yy2 = min(iy2, y2[j])
w = max(0.0, xx2 - xx1 + 1)
h = max(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (iarea + areas[j] - inter)
if ovr >= thresh:
suppressed[j] = 1
return keep
def soft_nms(np.ndarray[float, ndim=2] boxes, float sigma=0.5, float Nt=0.3, float threshold=0.001, unsigned int method=0):
cdef unsigned int N = boxes.shape[0]
cdef float iw, ih, box_area
cdef float ua
cdef int pos = 0
cdef float maxscore = 0
cdef int maxpos = 0
cdef float x1,x2,y1,y2,tx1,tx2,ty1,ty2,ts,area,weight,ov
for i in range(N):
maxscore = boxes[i, 4]
maxpos = i
tx1 = boxes[i,0]
ty1 = boxes[i,1]
tx2 = boxes[i,2]
ty2 = boxes[i,3]
ts = boxes[i,4]
pos = i + 1
# get max box
while pos < N:
if maxscore < boxes[pos, 4]:
maxscore = boxes[pos, 4]
maxpos = pos
pos = pos + 1
# add max box as a detection
boxes[i,0] = boxes[maxpos,0]
boxes[i,1] = boxes[maxpos,1]
boxes[i,2] = boxes[maxpos,2]
boxes[i,3] = boxes[maxpos,3]
boxes[i,4] = boxes[maxpos,4]
# swap ith box with position of max box
boxes[maxpos,0] = tx1
boxes[maxpos,1] = ty1
boxes[maxpos,2] = tx2
boxes[maxpos,3] = ty2
boxes[maxpos,4] = ts
tx1 = boxes[i,0]
ty1 = boxes[i,1]
tx2 = boxes[i,2]
ty2 = boxes[i,3]
ts = boxes[i,4]
pos = i + 1
# NMS iterations, note that N changes if detection boxes fall below threshold
while pos < N:
x1 = boxes[pos, 0]
y1 = boxes[pos, 1]
x2 = boxes[pos, 2]
y2 = boxes[pos, 3]
s = boxes[pos, 4]
area = (x2 - x1 + 1) * (y2 - y1 + 1)
iw = (min(tx2, x2) - max(tx1, x1) + 1)
if iw > 0:
ih = (min(ty2, y2) - max(ty1, y1) + 1)
if ih > 0:
ua = float((tx2 - tx1 + 1) * (ty2 - ty1 + 1) + area - iw * ih)
ov = iw * ih / ua #iou between max box and detection box
if method == 1: # linear
if ov > Nt:
weight = 1 - ov
else:
weight = 1
elif method == 2: # gaussian
weight = np.exp(-(ov * ov)/sigma)
else: # original NMS
if ov > Nt:
weight = 0
else:
weight = 1
boxes[pos, 4] = weight*boxes[pos, 4]
# if box score falls below threshold, discard the box by swapping with last box
# update N
if boxes[pos, 4] < threshold:
boxes[pos,0] = boxes[N-1, 0]
boxes[pos,1] = boxes[N-1, 1]
boxes[pos,2] = boxes[N-1, 2]
boxes[pos,3] = boxes[N-1, 3]
boxes[pos,4] = boxes[N-1, 4]
N = N - 1
pos = pos - 1
pos = pos + 1
keep = [i for i in range(N)]
return keep
def soft_nms_39(np.ndarray[float, ndim=2] boxes, float sigma=0.5, float Nt=0.3, float threshold=0.001, unsigned int method=0):
cdef unsigned int N = boxes.shape[0]
cdef float iw, ih, box_area
cdef float ua
cdef int pos = 0
cdef float maxscore = 0
cdef int maxpos = 0
cdef float x1,x2,y1,y2,tx1,tx2,ty1,ty2,ts,area,weight,ov
cdef float tmp
for i in range(N):
maxscore = boxes[i, 4]
maxpos = i
tx1 = boxes[i,0]
ty1 = boxes[i,1]
tx2 = boxes[i,2]
ty2 = boxes[i,3]
ts = boxes[i,4]
pos = i + 1
# get max box
while pos < N:
if maxscore < boxes[pos, 4]:
maxscore = boxes[pos, 4]
maxpos = pos
pos = pos + 1
# add max box as a detection
boxes[i,0] = boxes[maxpos,0]
boxes[i,1] = boxes[maxpos,1]
boxes[i,2] = boxes[maxpos,2]
boxes[i,3] = boxes[maxpos,3]
boxes[i,4] = boxes[maxpos,4]
# swap ith box with position of max box
boxes[maxpos,0] = tx1
boxes[maxpos,1] = ty1
boxes[maxpos,2] = tx2
boxes[maxpos,3] = ty2
boxes[maxpos,4] = ts
for j in range(5, 39):
tmp = boxes[i, j]
boxes[i, j] = boxes[maxpos, j]
boxes[maxpos, j] = tmp
tx1 = boxes[i,0]
ty1 = boxes[i,1]
tx2 = boxes[i,2]
ty2 = boxes[i,3]
ts = boxes[i,4]
pos = i + 1
# NMS iterations, note that N changes if detection boxes fall below threshold
while pos < N:
x1 = boxes[pos, 0]
y1 = boxes[pos, 1]
x2 = boxes[pos, 2]
y2 = boxes[pos, 3]
s = boxes[pos, 4]
area = (x2 - x1 + 1) * (y2 - y1 + 1)
iw = (min(tx2, x2) - max(tx1, x1) + 1)
if iw > 0:
ih = (min(ty2, y2) - max(ty1, y1) + 1)
if ih > 0:
ua = float((tx2 - tx1 + 1) * (ty2 - ty1 + 1) + area - iw * ih)
ov = iw * ih / ua #iou between max box and detection box
if method == 1: # linear
if ov > Nt:
weight = 1 - ov
else:
weight = 1
elif method == 2: # gaussian
weight = np.exp(-(ov * ov)/sigma)
else: # original NMS
if ov > Nt:
weight = 0
else:
weight = 1
boxes[pos, 4] = weight*boxes[pos, 4]
# if box score falls below threshold, discard the box by swapping with last box
# update N
if boxes[pos, 4] < threshold:
boxes[pos,0] = boxes[N-1, 0]
boxes[pos,1] = boxes[N-1, 1]
boxes[pos,2] = boxes[N-1, 2]
boxes[pos,3] = boxes[N-1, 3]
boxes[pos,4] = boxes[N-1, 4]
for j in range(5, 39):
tmp = boxes[pos, j]
boxes[pos, j] = boxes[N - 1, j]
boxes[N - 1, j] = tmp
N = N - 1
pos = pos - 1
pos = pos + 1
keep = [i for i in range(N)]
return keep
def soft_nms_merge(np.ndarray[float, ndim=2] boxes, float sigma=0.5, float Nt=0.3, float threshold=0.001, unsigned int method=0, float weight_exp=6):
cdef unsigned int N = boxes.shape[0]
cdef float iw, ih, box_area
cdef float ua
cdef int pos = 0
cdef float maxscore = 0
cdef int maxpos = 0
cdef float x1,x2,y1,y2,tx1,tx2,ty1,ty2,ts,area,weight,ov
cdef float mx1,mx2,my1,my2,mts,mbs,mw
for i in range(N):
maxscore = boxes[i, 4]
maxpos = i
tx1 = boxes[i,0]
ty1 = boxes[i,1]
tx2 = boxes[i,2]
ty2 = boxes[i,3]
ts = boxes[i,4]
pos = i + 1
# get max box
while pos < N:
if maxscore < boxes[pos, 4]:
maxscore = boxes[pos, 4]
maxpos = pos
pos = pos + 1
# add max box as a detection
boxes[i,0] = boxes[maxpos,0]
boxes[i,1] = boxes[maxpos,1]
boxes[i,2] = boxes[maxpos,2]
boxes[i,3] = boxes[maxpos,3]
boxes[i,4] = boxes[maxpos,4]
mx1 = boxes[i, 0] * boxes[i, 5]
my1 = boxes[i, 1] * boxes[i, 5]
mx2 = boxes[i, 2] * boxes[i, 6]
my2 = boxes[i, 3] * boxes[i, 6]
mts = boxes[i, 5]
mbs = boxes[i, 6]
# swap ith box with position of max box
boxes[maxpos,0] = tx1
boxes[maxpos,1] = ty1
boxes[maxpos,2] = tx2
boxes[maxpos,3] = ty2
boxes[maxpos,4] = ts
tx1 = boxes[i,0]
ty1 = boxes[i,1]
tx2 = boxes[i,2]
ty2 = boxes[i,3]
ts = boxes[i,4]
pos = i + 1
# NMS iterations, note that N changes if detection boxes fall below threshold
while pos < N:
x1 = boxes[pos, 0]
y1 = boxes[pos, 1]
x2 = boxes[pos, 2]
y2 = boxes[pos, 3]
s = boxes[pos, 4]
area = (x2 - x1 + 1) * (y2 - y1 + 1)
iw = (min(tx2, x2) - max(tx1, x1) + 1)
if iw > 0:
ih = (min(ty2, y2) - max(ty1, y1) + 1)
if ih > 0:
ua = float((tx2 - tx1 + 1) * (ty2 - ty1 + 1) + area - iw * ih)
ov = iw * ih / ua #iou between max box and detection box
if method == 1: # linear
if ov > Nt:
weight = 1 - ov
else:
weight = 1
elif method == 2: # gaussian
weight = np.exp(-(ov * ov)/sigma)
else: # original NMS
if ov > Nt:
weight = 0
else:
weight = 1
mw = (1 - weight) ** weight_exp
mx1 = mx1 + boxes[pos, 0] * boxes[pos, 5] * mw
my1 = my1 + boxes[pos, 1] * boxes[pos, 5] * mw
mx2 = mx2 + boxes[pos, 2] * boxes[pos, 6] * mw
my2 = my2 + boxes[pos, 3] * boxes[pos, 6] * mw
mts = mts + boxes[pos, 5] * mw
mbs = mbs + boxes[pos, 6] * mw
boxes[pos, 4] = weight*boxes[pos, 4]
# if box score falls below threshold, discard the box by swapping with last box
# update N
if boxes[pos, 4] < threshold:
boxes[pos,0] = boxes[N-1, 0]
boxes[pos,1] = boxes[N-1, 1]
boxes[pos,2] = boxes[N-1, 2]
boxes[pos,3] = boxes[N-1, 3]
boxes[pos,4] = boxes[N-1, 4]
N = N - 1
pos = pos - 1
pos = pos + 1
boxes[i, 0] = mx1 / mts
boxes[i, 1] = my1 / mts
boxes[i, 2] = mx2 / mbs
boxes[i, 3] = my2 / mbs
keep = [i for i in range(N)]
return keep
import numpy
from distutils.core import setup
from distutils.extension import Extension
from Cython.Build import cythonize
extensions = [
Extension(
"nms",
["nms.pyx"],
extra_compile_args=["-Wno-cpp", "-Wno-unused-function"]
)
]
setup(
name="coco",
ext_modules=cythonize(extensions),
include_dirs=[numpy.get_include()]
)
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
# Code referenced from https://gist.github.com/gyglim/1f8dfb1b5c82627ae3efcfbbadb9f514
import os
import time
import sys
import torch
USE_TENSORBOARD = True
try:
import tensorboardX
print('Using tensorboardX')
except:
USE_TENSORBOARD = False
class Logger(object):
def __init__(self, opt):
"""Create a summary writer logging to log_dir."""
if not os.path.exists(opt.save_dir):
os.makedirs(opt.save_dir)
if not os.path.exists(opt.debug_dir):
os.makedirs(opt.debug_dir)
time_str = time.strftime('%Y-%m-%d-%H-%M')
args = dict((name, getattr(opt, name)) for name in dir(opt)
if not name.startswith('_'))
file_name = os.path.join(opt.save_dir, 'opt.txt')
with open(file_name, 'wt') as opt_file:
opt_file.write('==> torch version: {}\n'.format(torch.__version__))
opt_file.write('==> cudnn version: {}\n'.format(
torch.backends.cudnn.version()))
opt_file.write('==> Cmd:\n')
opt_file.write(str(sys.argv))
opt_file.write('\n==> Opt:\n')
for k, v in sorted(args.items()):
opt_file.write(' %s: %s\n' % (str(k), str(v)))
log_dir = opt.save_dir + '/logs_{}'.format(time_str)
if USE_TENSORBOARD:
self.writer = tensorboardX.SummaryWriter(logdir=log_dir)
else:
if not os.path.exists(os.path.dirname(log_dir)):
os.mkdir(os.path.dirname(log_dir))
if not os.path.exists(log_dir):
os.mkdir(log_dir)
self.log = open(log_dir + '/log.txt', 'w')
try:
os.system('cp {}/opt.txt {}/'.format(opt.save_dir, log_dir))
except:
pass
self.start_line = True
def write(self, txt):
if self.start_line:
time_str = time.strftime('%Y-%m-%d-%H-%M')
self.log.write('{}: {}'.format(time_str, txt))
else:
self.log.write(txt)
self.start_line = False
if '\n' in txt:
self.start_line = True
self.log.flush()
def close(self):
self.log.close()
def scalar_summary(self, tag, value, step):
"""Log a scalar variable."""
if USE_TENSORBOARD:
self.writer.add_scalar(tag, value, step)
from torch import nn
import torch.utils.model_zoo as model_zoo
from collections import OrderedDict
import math
__all__ = ['MobileNetV2']
model_urls = {
'mobilenet_v2': 'https://download.pytorch.org/models/mobilenet_v2-b0353104.pth',
}
def _make_divisible(v, divisor, min_value=None):
"""
This function is taken from the original tf repo.
It ensures that all layers have a channel number that is divisible by 8
It can be seen here:
https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
:param v:
:param divisor:
:param min_value:
:return:
"""
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
class ConvBNReLU(nn.Sequential):
def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1):
padding = (kernel_size - 1) // 2
super(ConvBNReLU, self).__init__(
nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, groups=groups, bias=False),
nn.BatchNorm2d(out_planes),
nn.ReLU6(inplace=True)
)
class InvertedResidual(nn.Module):
def __init__(self, inp, oup, stride, expand_ratio):
super(InvertedResidual, self).__init__()
self.stride = stride
assert stride in [1, 2]
hidden_dim = int(round(inp * expand_ratio))
self.use_res_connect = self.stride == 1 and inp == oup
layers = []
if expand_ratio != 1:
# pw
layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1))
layers.extend([
# dw
ConvBNReLU(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
])
self.conv = nn.Sequential(*layers)
def forward(self, x):
if self.use_res_connect:
return x + self.conv(x)
else:
return self.conv(x)
class MobileNetV2(nn.Module):
def __init__(self,width_mult=1.0,round_nearest=8,):
super(MobileNetV2, self).__init__()
block = InvertedResidual
input_channel = 32
inverted_residual_setting = [
# t, c, n, s
[1, 16, 1, 1], # 0
[6, 24, 2, 2], # 1
[6, 32, 3, 2], # 2
[6, 64, 4, 2], # 3
[6, 96, 3, 1], # 4
[6, 160, 3, 2],# 5
[6, 320, 1, 1],# 6
]
self.feat_id = [1,2,4,6]
self.feat_channel = []
# only check the first element, assuming user knows t,c,n,s are required
if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
raise ValueError("inverted_residual_setting should be non-empty "
"or a 4-element list, got {}".format(inverted_residual_setting))
# building first layer
input_channel = _make_divisible(input_channel * width_mult, round_nearest)
features = [ConvBNReLU(3, input_channel, stride=2)]
# building inverted residual blocks
for id,(t, c, n, s) in enumerate(inverted_residual_setting):
output_channel = _make_divisible(c * width_mult, round_nearest)
for i in range(n):
stride = s if i == 0 else 1
features.append(block(input_channel, output_channel, stride, expand_ratio=t))
input_channel = output_channel
if id in self.feat_id :
self.__setattr__("feature_%d"%id,nn.Sequential(*features))
self.feat_channel.append(output_channel)
features = []
# weight initialization
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out')
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.BatchNorm2d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
def forward(self, x):
y = []
for id in self.feat_id:
x = self.__getattr__("feature_%d"%id)(x)
y.append(x)
return y
def load_model(model,state_dict):
new_model=model.state_dict()
new_keys = list(new_model.keys())
old_keys = list(state_dict.keys())
restore_dict = OrderedDict()
for id in range(len(new_keys)):
restore_dict[new_keys[id]] = state_dict[old_keys[id]]
model.load_state_dict(restore_dict)
def dict2list(func):
def wrap(*args, **kwargs):
self = args[0]
x = args[1]
ret_list = []
ret = func(self, x)
for k, v in ret[0].items():
ret_list.append(v)
return ret_list
return wrap
def fill_up_weights(up):
w = up.weight.data
f = math.ceil(w.size(2) / 2)
c = (2 * f - 1 - f % 2) / (2. * f)
for i in range(w.size(2)):
for j in range(w.size(3)):
w[0, 0, i, j] = \
(1 - math.fabs(i / f - c)) * (1 - math.fabs(j / f - c))
for c in range(1, w.size(0)):
w[c, 0, :, :] = w[0, 0, :, :]
def fill_fc_weights(layers):
for m in layers.modules():
if isinstance(m, nn.Conv2d):
nn.init.normal_(m.weight, std=0.001)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
class IDAUp(nn.Module):
def __init__(self, out_dim, channel):
super(IDAUp, self).__init__()
self.out_dim = out_dim
self.up = nn.Sequential(
nn.ConvTranspose2d(
out_dim, out_dim, kernel_size=2, stride=2, padding=0,
output_padding=0, groups=out_dim, bias=False),
nn.BatchNorm2d(out_dim,eps=0.001,momentum=0.1),
nn.ReLU())
self.conv = nn.Sequential(
nn.Conv2d(channel, out_dim,
kernel_size=1, stride=1, bias=False),
nn.BatchNorm2d(out_dim,eps=0.001,momentum=0.1),
nn.ReLU(inplace=True))
def forward(self, layers):
layers = list(layers)
x = self.up(layers[0])
y = self.conv(layers[1])
out = x + y
return out
class MobileNetUp(nn.Module):
def __init__(self, channels, out_dim = 24):
super(MobileNetUp, self).__init__()
channels = channels[::-1]
self.conv = nn.Sequential(
nn.Conv2d(channels[0], out_dim,
kernel_size=1, stride=1, bias=False),
nn.BatchNorm2d(out_dim,eps=0.001,momentum=0.1),
nn.ReLU(inplace=True))
self.conv_last = nn.Sequential(
nn.Conv2d(out_dim,out_dim,
kernel_size=3, stride=1, padding=1 ,bias=False),
nn.BatchNorm2d(out_dim,eps=1e-5,momentum=0.01),
nn.ReLU(inplace=True))
for i,channel in enumerate(channels[1:]):
setattr(self,'up_%d'%(i),IDAUp(out_dim,channel))
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m,nn.ConvTranspose2d):
fill_up_weights(m)
def forward(self, layers):
layers = list(layers)
assert len(layers) > 1
x = self.conv(layers[-1])
for i in range(0,len(layers)-1):
up = getattr(self, 'up_{}'.format(i))
x = up([x,layers[len(layers)-2-i]])
x = self.conv_last(x)
return x
class MobileNetSeg(nn.Module):
def __init__(self, base_name,heads,head_conv=24, pretrained = True):
super(MobileNetSeg, self).__init__()
self.heads = heads
self.base = globals()[base_name](
pretrained=pretrained)
channels = self.base.feat_channel
self.dla_up = MobileNetUp(channels, out_dim=head_conv)
for head in self.heads:
classes = self.heads[head]
if head == 'hm':
fc = nn.Sequential(
nn.Conv2d(head_conv, classes,
kernel_size=1, stride=1,
padding=0, bias=True),
nn.Sigmoid()
)
else:
fc = nn.Conv2d(head_conv, classes,
kernel_size=1, stride=1,
padding=0, bias=True)
# if 'hm' in head:
# fc.bias.data.fill_(-2.19)
# else:
# nn.init.normal_(fc.weight, std=0.001)
# nn.init.constant_(fc.bias, 0)
self.__setattr__(head, fc)
# @dict2list # 转onnx的时候需要将输出由dict转成list模式
def forward(self, x):
x = self.base(x)
x = self.dla_up(x)
ret = {}
for head in self.heads:
ret[head] = self.__getattr__(head)(x)
return [ret]
def mobilenetv2_10(pretrained=True, **kwargs):
model = MobileNetV2(width_mult=1.0)
if pretrained:
state_dict = model_zoo.load_url(model_urls['mobilenet_v2'],
progress=True)
load_model(model,state_dict)
return model
def mobilenetv2_5(pretrained=False, **kwargs):
model = MobileNetV2(width_mult=0.5)
if pretrained:
print('This version does not have pretrain weights.')
return model
# num_layers : [10 , 5]
def get_mobile_net(num_layers, heads, head_conv=24):
model = MobileNetSeg('mobilenetv2_{}'.format(num_layers), heads,
pretrained=True,
head_conv=head_conv)
return model
if __name__ == '__main__':
import torch
input = torch.zeros([1,3,416,416])
model = get_mobile_net(10,{'hm':1, 'hm_offset':2, 'wh':2, 'landmarks':10},head_conv=24) # hm reference for the classes of objects//这个头文件只能做矩形框检测
res = model(input)
print(res.shape)
from torch import nn
import torch.utils.model_zoo as model_zoo
from collections import OrderedDict
import math
__all__ = ['MobileNetV2']
model_urls = {
'mobilenet_v2': 'https://download.pytorch.org/models/mobilenet_v2-b0353104.pth',
}
def _make_divisible(v, divisor, min_value=None):
"""
This function is taken from the original tf repo.
It ensures that all layers have a channel number that is divisible by 8
It can be seen here:
https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
:param v:
:param divisor:
:param min_value:
:return:
"""
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
class ConvBNReLU(nn.Sequential):
def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1):
padding = (kernel_size - 1) // 2
super(ConvBNReLU, self).__init__(
nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, groups=groups, bias=False),
nn.BatchNorm2d(out_planes),
nn.ReLU6(inplace=True)
)
class InvertedResidual(nn.Module):
def __init__(self, inp, oup, stride, expand_ratio):
super(InvertedResidual, self).__init__()
self.stride = stride
assert stride in [1, 2]
hidden_dim = int(round(inp * expand_ratio))
self.use_res_connect = self.stride == 1 and inp == oup
layers = []
if expand_ratio != 1:
# pw
layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1))
layers.extend([
# dw
ConvBNReLU(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
])
self.conv = nn.Sequential(*layers)
def forward(self, x):
if self.use_res_connect:
return x + self.conv(x)
else:
return self.conv(x)
class MobileNetV2(nn.Module):
def __init__(self,width_mult=1.0,round_nearest=8,):
super(MobileNetV2, self).__init__()
block = InvertedResidual
input_channel = 32
inverted_residual_setting = [
# t, c, n, s
[1, 16, 1, 1], # 0
[6, 24, 2, 2], # 1
[6, 32, 3, 2], # 2
[6, 64, 4, 2], # 3
[6, 96, 3, 1], # 4
[6, 160, 3, 2],# 5
[6, 320, 1, 1],# 6
]
self.feat_id = [1,2,4,6]
self.feat_channel = []
# only check the first element, assuming user knows t,c,n,s are required
if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
raise ValueError("inverted_residual_setting should be non-empty "
"or a 4-element list, got {}".format(inverted_residual_setting))
# building first layer
input_channel = _make_divisible(input_channel * width_mult, round_nearest)
features = [ConvBNReLU(3, input_channel, stride=2)]
# building inverted residual blocks
for id,(t, c, n, s) in enumerate(inverted_residual_setting):
output_channel = _make_divisible(c * width_mult, round_nearest)
for i in range(n):
stride = s if i == 0 else 1
features.append(block(input_channel, output_channel, stride, expand_ratio=t))
input_channel = output_channel
if id in self.feat_id :
self.__setattr__("feature_%d"%id,nn.Sequential(*features))
self.feat_channel.append(output_channel)
features = []
# weight initialization
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out')
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.BatchNorm2d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
def forward(self, x):
y = []
for id in self.feat_id:
x = self.__getattr__("feature_%d"%id)(x)
y.append(x)
return y
def load_model(model,state_dict):
new_model=model.state_dict()
new_keys = list(new_model.keys())
old_keys = list(state_dict.keys())
restore_dict = OrderedDict()
for id in range(len(new_keys)):
restore_dict[new_keys[id]] = state_dict[old_keys[id]]
model.load_state_dict(restore_dict)
def dict2list(func):
def wrap(*args, **kwargs):
self = args[0]
x = args[1]
ret_list = []
ret = func(self, x)
for k, v in ret[0].items():
ret_list.append(v)
return ret_list
return wrap
def fill_up_weights(up):
w = up.weight.data
f = math.ceil(w.size(2) / 2)
c = (2 * f - 1 - f % 2) / (2. * f)
for i in range(w.size(2)):
for j in range(w.size(3)):
w[0, 0, i, j] = \
(1 - math.fabs(i / f - c)) * (1 - math.fabs(j / f - c))
for c in range(1, w.size(0)):
w[c, 0, :, :] = w[0, 0, :, :]
def fill_fc_weights(layers):
for m in layers.modules():
if isinstance(m, nn.Conv2d):
nn.init.normal_(m.weight, std=0.001)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
class IDAUp(nn.Module):
def __init__(self, out_dim, channel):
super(IDAUp, self).__init__()
self.out_dim = out_dim
self.up = nn.Sequential(
nn.ConvTranspose2d(
out_dim, out_dim, kernel_size=2, stride=2, padding=0,
output_padding=0, groups=out_dim, bias=False),
nn.BatchNorm2d(out_dim,eps=0.001,momentum=0.1),
nn.ReLU())
self.conv = nn.Sequential(
nn.Conv2d(channel, out_dim,
kernel_size=1, stride=1, bias=False),
nn.BatchNorm2d(out_dim,eps=0.001,momentum=0.1),
nn.ReLU(inplace=True))
# self.smooth = nn.Conv2d(out_dim, out_dim, kernel_size=3, stride=1, padding=1)
def forward(self, layers):
layers = list(layers)
x = self.up(layers[0])
y = self.conv(layers[1])
# out = self.smooth(x + y)
out = x + y
return out
class MobileNetUp(nn.Module):
def __init__(self, channels, out_dim = 24):
super(MobileNetUp, self).__init__()
channels = channels[::-1]
self.conv = nn.Sequential(
nn.Conv2d(channels[0], out_dim,
kernel_size=1, stride=1, bias=False),
nn.BatchNorm2d(out_dim,eps=0.001,momentum=0.1),
nn.ReLU(inplace=True))
self.conv_last = nn.Sequential(
nn.Conv2d(out_dim,out_dim,
kernel_size=3, stride=1, padding=1 ,bias=False),
nn.BatchNorm2d(out_dim,eps=1e-5,momentum=0.01),
nn.ReLU(inplace=True))
for i,channel in enumerate(channels[1:]):
setattr(self,'up_%d'%(i),IDAUp(out_dim,channel))
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m,nn.ConvTranspose2d):
fill_up_weights(m)
def forward(self, layers):
layers = list(layers)
assert len(layers) > 1
x = self.conv(layers[-1])
for i in range(0,len(layers)-1):
up = getattr(self, 'up_{}'.format(i))
x = up([x,layers[len(layers)-2-i]])
x = self.conv_last(x)
return x
class MobileNetSeg(nn.Module):
def __init__(self, base_name,heads,head_conv=24, pretrained = True):
super(MobileNetSeg, self).__init__()
self.heads = heads
self.base = globals()[base_name](
pretrained=pretrained)
channels = self.base.feat_channel
self.dla_up = MobileNetUp(channels, out_dim=head_conv)
for head in self.heads:
classes = self.heads[head]
if head == 'hm':
fc = nn.Sequential(
nn.Conv2d(head_conv, classes,
kernel_size=1, stride=1,
padding=0, bias=True),
nn.Sigmoid()
)
else:
fc = nn.Conv2d(head_conv, classes,
kernel_size=1, stride=1,
padding=0, bias=True)
# if 'hm' in head:
# fc.bias.data.fill_(-2.19)
# else:
# nn.init.normal_(fc.weight, std=0.001)
# nn.init.constant_(fc.bias, 0)
self.__setattr__(head, fc)
# @dict2list # 转onnx的时候需要将输出由dict转成list模式
def forward(self, x):
x = self.base(x)
x = self.dla_up(x)
ret = {}
for head in self.heads:
ret[head] = self.__getattr__(head)(x)
return [ret]
def mobilenetv2_10(pretrained=True, **kwargs):
model = MobileNetV2(width_mult=1.0)
if pretrained:
state_dict = model_zoo.load_url(model_urls['mobilenet_v2'],
progress=True)
load_model(model,state_dict)
return model
def mobilenetv2_5(pretrained=False, **kwargs):
model = MobileNetV2(width_mult=0.5)
if pretrained:
print('This version does not have pretrain weights.')
return model
# num_layers : [10 , 5]
def get_mobile_net(num_layers, heads, head_conv=24):
model = MobileNetSeg('mobilenetv2_{}'.format(num_layers), heads,
pretrained=True,
head_conv=head_conv)
return model
if __name__ == '__main__':
import torch
input = torch.zeros([1,3,416,416])
model = get_mobile_net(10,{'hm':1, 'hm_offset':2, 'wh':2, 'landmarks':10},head_conv=24) # hm reference for the classes of objects//这个头文件只能做矩形框检测
res = model(input)
print(res.shape)
import math
from collections import OrderedDict
import torch
import torch.nn as nn
import torch.nn.functional as F
class BasicBlock(nn.Module):
def __init__(self, inplanes, planes):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(inplanes, planes[0], kernel_size=1,
stride=1, padding=0, bias=False)
self.bn1 = nn.BatchNorm2d(planes[0])
self.relu1 = nn.LeakyReLU(0.1)
self.conv2 = nn.Conv2d(planes[0], planes[1], kernel_size=3,
stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes[1])
self.relu2 = nn.LeakyReLU(0.1)
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu1(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu2(out)
out += residual
return out
class DarkNet(nn.Module):
def __init__(self, layers):
super(DarkNet, self).__init__()
self.inplanes = 32
self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(self.inplanes)
self.relu1 = nn.LeakyReLU(0.1)
self.layer1 = self._make_layer([32, 64], layers[0])
self.layer2 = self._make_layer([64, 128], layers[1])
self.layer3 = self._make_layer([128, 256], layers[2])
#self.layer4 = self._make_layer([256, 512], layers[3])
#self.layer5 = self._make_layer([512, 1024], layers[4])
self.layers_out_filters = [64, 128, 256]
for m in self.modules():
if isinstance(m, nn.BatchNorm2d):
m.weight.data.fill_(1)
m.bias.data.zero_()
def _make_layer(self, planes, blocks):
layers = []
# downsample
layers.append(("ds_conv", nn.Conv2d(self.inplanes, planes[1], kernel_size=3,
stride=2, padding=1, bias=False)))
layers.append(("ds_bn", nn.BatchNorm2d(planes[1])))
layers.append(("ds_relu", nn.LeakyReLU(0.1)))
# blocks
self.inplanes = planes[1]
for i in range(0, blocks):
layers.append(("residual_{}".format(i), BasicBlock(self.inplanes, planes)))
return nn.Sequential(OrderedDict(layers))
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu1(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = F.interpolate(x, size=(128, 128),
mode="bilinear", align_corners=True)
return x
def darknet21(cfg,is_train=True, **kwargs):
model = DarkNet([1, 1, 2, 2, 1])
if is_train and cfg.BACKBONE.INIT_WEIGHTS:
if isinstance(cfg.BACKBONE.PRETRAINED, str):
model.load_state_dict(torch.load(cfg.BACKBONE.PRETRAINED))
else:
raise Exception("darknet request a pretrained path. got [{}]".format(cfg.BACKBONE.PRETRAINED))
return model
def darknet53(num_layers, cfg):
model = DarkNet([1, 2, 8])
#if is_train and cfg.BACKBONE.INIT_WEIGHTS:
# if isinstance(cfg.BACKBONE.PRETRAINED, str):
# model.load_state_dict(torch.load(cfg.BACKBONE.PRETRAINED))
# else:
# raise Exception("darknet request a pretrained path. got [{}]".format(cfg.BACKBONE.PRETRAINED))
return model
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
import math
from os.path import join
import numpy as np
import torch
import torch.utils.model_zoo as model_zoo
from torch import nn
BatchNorm = nn.BatchNorm2d
def get_model_url(data='imagenet', name='dla34', hash='ba72cf86'):
return join('http://dl.yf.io/dla/models', data, '{}-{}.pth'.format(name, hash))
def conv3x3(in_planes, out_planes, stride=1):
"3x3 convolution with padding"
return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
padding=1, bias=False)
class BasicBlock(nn.Module):
def __init__(self, inplanes, planes, stride=1, dilation=1):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=3,
stride=stride, padding=dilation,
bias=False, dilation=dilation)
self.bn1 = BatchNorm(planes)
self.relu = nn.ReLU(inplace=True)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
stride=1, padding=dilation,
bias=False, dilation=dilation)
self.bn2 = BatchNorm(planes)
self.stride = stride
def forward(self, x, residual=None):
if residual is None:
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out += residual
out = self.relu(out)
return out
class Bottleneck(nn.Module):
expansion = 2
def __init__(self, inplanes, planes, stride=1, dilation=1):
super(Bottleneck, self).__init__()
expansion = Bottleneck.expansion
bottle_planes = planes // expansion
self.conv1 = nn.Conv2d(inplanes, bottle_planes,
kernel_size=1, bias=False)
self.bn1 = BatchNorm(bottle_planes)
self.conv2 = nn.Conv2d(bottle_planes, bottle_planes, kernel_size=3,
stride=stride, padding=dilation,
bias=False, dilation=dilation)
self.bn2 = BatchNorm(bottle_planes)
self.conv3 = nn.Conv2d(bottle_planes, planes,
kernel_size=1, bias=False)
self.bn3 = BatchNorm(planes)
self.relu = nn.ReLU(inplace=True)
self.stride = stride
def forward(self, x, residual=None):
if residual is None:
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
out += residual
out = self.relu(out)
return out
class BottleneckX(nn.Module):
expansion = 2
cardinality = 32
def __init__(self, inplanes, planes, stride=1, dilation=1):
super(BottleneckX, self).__init__()
cardinality = BottleneckX.cardinality
# dim = int(math.floor(planes * (BottleneckV5.expansion / 64.0)))
# bottle_planes = dim * cardinality
bottle_planes = planes * cardinality // 32
self.conv1 = nn.Conv2d(inplanes, bottle_planes,
kernel_size=1, bias=False)
self.bn1 = BatchNorm(bottle_planes)
self.conv2 = nn.Conv2d(bottle_planes, bottle_planes, kernel_size=3,
stride=stride, padding=dilation, bias=False,
dilation=dilation, groups=cardinality)
self.bn2 = BatchNorm(bottle_planes)
self.conv3 = nn.Conv2d(bottle_planes, planes,
kernel_size=1, bias=False)
self.bn3 = BatchNorm(planes)
self.relu = nn.ReLU(inplace=True)
self.stride = stride
def forward(self, x, residual=None):
if residual is None:
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
out += residual
out = self.relu(out)
return out
class Root(nn.Module):
def __init__(self, in_channels, out_channels, kernel_size, residual):
super(Root, self).__init__()
self.conv = nn.Conv2d(
in_channels, out_channels, 1,
stride=1, bias=False, padding=(kernel_size - 1) // 2)
self.bn = BatchNorm(out_channels)
self.relu = nn.ReLU(inplace=True)
self.residual = residual
def forward(self, *x):
children = x
x = self.conv(torch.cat(x, 1))
x = self.bn(x)
if self.residual:
x += children[0]
x = self.relu(x)
return x
class Tree(nn.Module):
def __init__(self, levels, block, in_channels, out_channels, stride=1,
level_root=False, root_dim=0, root_kernel_size=1,
dilation=1, root_residual=False):
super(Tree, self).__init__()
if root_dim == 0:
root_dim = 2 * out_channels
if level_root:
root_dim += in_channels
if levels == 1:
self.tree1 = block(in_channels, out_channels, stride,
dilation=dilation)
self.tree2 = block(out_channels, out_channels, 1,
dilation=dilation)
else:
self.tree1 = Tree(levels - 1, block, in_channels, out_channels,
stride, root_dim=0,
root_kernel_size=root_kernel_size,
dilation=dilation, root_residual=root_residual)
self.tree2 = Tree(levels - 1, block, out_channels, out_channels,
root_dim=root_dim + out_channels,
root_kernel_size=root_kernel_size,
dilation=dilation, root_residual=root_residual)
if levels == 1:
self.root = Root(root_dim, out_channels, root_kernel_size,
root_residual)
self.level_root = level_root
self.root_dim = root_dim
self.downsample = None
self.project = None
self.levels = levels
if stride > 1:
self.downsample = nn.MaxPool2d(stride, stride=stride)
if in_channels != out_channels:
self.project = nn.Sequential(
nn.Conv2d(in_channels, out_channels,
kernel_size=1, stride=1, bias=False),
BatchNorm(out_channels)
)
def forward(self, x, residual=None, children=None):
children = [] if children is None else children
bottom = self.downsample(x) if self.downsample else x
residual = self.project(bottom) if self.project else bottom
if self.level_root:
children.append(bottom)
x1 = self.tree1(x, residual)
if self.levels == 1:
x2 = self.tree2(x1)
x = self.root(x2, x1, *children)
else:
children.append(x1)
x = self.tree2(x1, children=children)
return x
class DLA(nn.Module):
def __init__(self, levels, channels, num_classes=1000,
block=BasicBlock, residual_root=False, return_levels=False,
pool_size=7, linear_root=False):
super(DLA, self).__init__()
self.channels = channels
self.return_levels = return_levels
self.num_classes = num_classes
self.base_layer = nn.Sequential(
nn.Conv2d(3, channels[0], kernel_size=7, stride=1,
padding=3, bias=False),
BatchNorm(channels[0]),
nn.ReLU(inplace=True))
self.level0 = self._make_conv_level(
channels[0], channels[0], levels[0])
self.level1 = self._make_conv_level(
channels[0], channels[1], levels[1], stride=2)
self.level2 = Tree(levels[2], block, channels[1], channels[2], 2,
level_root=False,
root_residual=residual_root)
self.level3 = Tree(levels[3], block, channels[2], channels[3], 2,
level_root=True, root_residual=residual_root)
self.level4 = Tree(levels[4], block, channels[3], channels[4], 2,
level_root=True, root_residual=residual_root)
self.level5 = Tree(levels[5], block, channels[4], channels[5], 2,
level_root=True, root_residual=residual_root)
self.avgpool = nn.AvgPool2d(pool_size)
self.fc = nn.Conv2d(channels[-1], num_classes, kernel_size=1,
stride=1, padding=0, bias=True)
for m in self.modules():
if isinstance(m, nn.Conv2d):
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data.normal_(0, math.sqrt(2. / n))
elif isinstance(m, BatchNorm):
m.weight.data.fill_(1)
m.bias.data.zero_()
def _make_level(self, block, inplanes, planes, blocks, stride=1):
downsample = None
if stride != 1 or inplanes != planes:
downsample = nn.Sequential(
nn.MaxPool2d(stride, stride=stride),
nn.Conv2d(inplanes, planes,
kernel_size=1, stride=1, bias=False),
BatchNorm(planes),
)
layers = []
layers.append(block(inplanes, planes, stride, downsample=downsample))
for i in range(1, blocks):
layers.append(block(inplanes, planes))
return nn.Sequential(*layers)
def _make_conv_level(self, inplanes, planes, convs, stride=1, dilation=1):
modules = []
for i in range(convs):
modules.extend([
nn.Conv2d(inplanes, planes, kernel_size=3,
stride=stride if i == 0 else 1,
padding=dilation, bias=False, dilation=dilation),
BatchNorm(planes),
nn.ReLU(inplace=True)])
inplanes = planes
return nn.Sequential(*modules)
def forward(self, x):
y = []
x = self.base_layer(x)
for i in range(6):
x = getattr(self, 'level{}'.format(i))(x)
y.append(x)
if self.return_levels:
return y
else:
x = self.avgpool(x)
x = self.fc(x)
x = x.view(x.size(0), -1)
return x
def load_pretrained_model(self, data='imagenet', name='dla34', hash='ba72cf86'):
fc = self.fc
if name.endswith('.pth'):
model_weights = torch.load(data + name)
else:
model_url = get_model_url(data, name, hash)
model_weights = model_zoo.load_url(model_url)
num_classes = len(model_weights[list(model_weights.keys())[-1]])
self.fc = nn.Conv2d(
self.channels[-1], num_classes,
kernel_size=1, stride=1, padding=0, bias=True)
self.load_state_dict(model_weights)
self.fc = fc
def dla34(pretrained, **kwargs): # DLA-34
model = DLA([1, 1, 1, 2, 2, 1],
[16, 32, 64, 128, 256, 512],
block=BasicBlock, **kwargs)
if pretrained:
model.load_pretrained_model(data='imagenet', name='dla34', hash='ba72cf86')
return model
def dla46_c(pretrained=None, **kwargs): # DLA-46-C
Bottleneck.expansion = 2
model = DLA([1, 1, 1, 2, 2, 1],
[16, 32, 64, 64, 128, 256],
block=Bottleneck, **kwargs)
if pretrained is not None:
model.load_pretrained_model(pretrained, 'dla46_c')
return model
def dla46x_c(pretrained=None, **kwargs): # DLA-X-46-C
BottleneckX.expansion = 2
model = DLA([1, 1, 1, 2, 2, 1],
[16, 32, 64, 64, 128, 256],
block=BottleneckX, **kwargs)
if pretrained is not None:
model.load_pretrained_model(pretrained, 'dla46x_c')
return model
def dla60x_c(pretrained, **kwargs): # DLA-X-60-C
BottleneckX.expansion = 2
model = DLA([1, 1, 1, 2, 3, 1],
[16, 32, 64, 64, 128, 256],
block=BottleneckX, **kwargs)
if pretrained:
model.load_pretrained_model(data='imagenet', name='dla60x_c', hash='b870c45c')
return model
def dla60(pretrained=None, **kwargs): # DLA-60
Bottleneck.expansion = 2
model = DLA([1, 1, 1, 2, 3, 1],
[16, 32, 128, 256, 512, 1024],
block=Bottleneck, **kwargs)
if pretrained is not None:
model.load_pretrained_model(pretrained, 'dla60')
return model
def dla60x(pretrained=None, **kwargs): # DLA-X-60
BottleneckX.expansion = 2
model = DLA([1, 1, 1, 2, 3, 1],
[16, 32, 128, 256, 512, 1024],
block=BottleneckX, **kwargs)
if pretrained is not None:
model.load_pretrained_model(pretrained, 'dla60x')
return model
def dla102(pretrained=None, **kwargs): # DLA-102
Bottleneck.expansion = 2
model = DLA([1, 1, 1, 3, 4, 1], [16, 32, 128, 256, 512, 1024],
block=Bottleneck, residual_root=True, **kwargs)
if pretrained is not None:
model.load_pretrained_model(pretrained, 'dla102')
return model
def dla102x(pretrained=None, **kwargs): # DLA-X-102
BottleneckX.expansion = 2
model = DLA([1, 1, 1, 3, 4, 1], [16, 32, 128, 256, 512, 1024],
block=BottleneckX, residual_root=True, **kwargs)
if pretrained is not None:
model.load_pretrained_model(pretrained, 'dla102x')
return model
def dla102x2(pretrained=None, **kwargs): # DLA-X-102 64
BottleneckX.cardinality = 64
model = DLA([1, 1, 1, 3, 4, 1], [16, 32, 128, 256, 512, 1024],
block=BottleneckX, residual_root=True, **kwargs)
if pretrained is not None:
model.load_pretrained_model(pretrained, 'dla102x2')
return model
def dla169(pretrained=None, **kwargs): # DLA-169
Bottleneck.expansion = 2
model = DLA([1, 1, 2, 3, 5, 1], [16, 32, 128, 256, 512, 1024],
block=Bottleneck, residual_root=True, **kwargs)
if pretrained is not None:
model.load_pretrained_model(pretrained, 'dla169')
return model
def set_bn(bn):
global BatchNorm
BatchNorm = bn
dla.BatchNorm = bn
class Identity(nn.Module):
def __init__(self):
super(Identity, self).__init__()
def forward(self, x):
return x
def fill_up_weights(up):
w = up.weight.data
f = math.ceil(w.size(2) / 2)
c = (2 * f - 1 - f % 2) / (2. * f)
for i in range(w.size(2)):
for j in range(w.size(3)):
w[0, 0, i, j] = \
(1 - math.fabs(i / f - c)) * (1 - math.fabs(j / f - c))
for c in range(1, w.size(0)):
w[c, 0, :, :] = w[0, 0, :, :]
class IDAUp(nn.Module):
def __init__(self, node_kernel, out_dim, channels, up_factors):
super(IDAUp, self).__init__()
self.channels = channels
self.out_dim = out_dim
for i, c in enumerate(channels):
if c == out_dim:
proj = Identity()
else:
proj = nn.Sequential(
nn.Conv2d(c, out_dim,
kernel_size=1, stride=1, bias=False),
BatchNorm(out_dim),
nn.ReLU(inplace=True))
f = int(up_factors[i])
if f == 1:
up = Identity()
else:
up = nn.ConvTranspose2d(
out_dim, out_dim, f * 2, stride=f, padding=f // 2,
output_padding=0, groups=out_dim, bias=False)
fill_up_weights(up)
setattr(self, 'proj_' + str(i), proj)
setattr(self, 'up_' + str(i), up)
for i in range(1, len(channels)):
node = nn.Sequential(
nn.Conv2d(out_dim * 2, out_dim,
kernel_size=node_kernel, stride=1,
padding=node_kernel // 2, bias=False),
BatchNorm(out_dim),
nn.ReLU(inplace=True))
setattr(self, 'node_' + str(i), node)
for m in self.modules():
if isinstance(m, nn.Conv2d):
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data.normal_(0, math.sqrt(2. / n))
elif isinstance(m, BatchNorm):
m.weight.data.fill_(1)
m.bias.data.zero_()
def forward(self, layers):
assert len(self.channels) == len(layers), \
'{} vs {} layers'.format(len(self.channels), len(layers))
layers = list(layers)
for i, l in enumerate(layers):
upsample = getattr(self, 'up_' + str(i))
project = getattr(self, 'proj_' + str(i))
layers[i] = upsample(project(l))
x = layers[0]
y = []
for i in range(1, len(layers)):
node = getattr(self, 'node_' + str(i))
x = node(torch.cat([x, layers[i]], 1))
y.append(x)
return x, y
class DLAUp(nn.Module):
def __init__(self, channels, scales=(1, 2, 4, 8, 16), in_channels=None):
super(DLAUp, self).__init__()
if in_channels is None:
in_channels = channels
self.channels = channels
channels = list(channels)
scales = np.array(scales, dtype=int)
for i in range(len(channels) - 1):
j = -i - 2
setattr(self, 'ida_{}'.format(i),
IDAUp(3, channels[j], in_channels[j:],
scales[j:] // scales[j]))
scales[j + 1:] = scales[j]
in_channels[j + 1:] = [channels[j] for _ in channels[j + 1:]]
def forward(self, layers):
layers = list(layers)
assert len(layers) > 1
for i in range(len(layers) - 1):
ida = getattr(self, 'ida_{}'.format(i))
x, y = ida(layers[-i - 2:])
layers[-i - 1:] = y
return x
def fill_fc_weights(layers):
for m in layers.modules():
if isinstance(m, nn.Conv2d):
nn.init.normal_(m.weight, std=0.001)
# torch.nn.init.kaiming_normal_(m.weight.data, nonlinearity='relu')
# torch.nn.init.xavier_normal_(m.weight.data)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
class DLASeg(nn.Module):
def __init__(self, base_name, heads,
pretrained=True, down_ratio=4, head_conv=256):
super(DLASeg, self).__init__()
assert down_ratio in [2, 4, 8, 16]
self.heads = heads
self.first_level = int(np.log2(down_ratio))
self.base = globals()[base_name](
pretrained=pretrained, return_levels=True)
channels = self.base.channels
scales = [2 ** i for i in range(len(channels[self.first_level:]))]
self.dla_up = DLAUp(channels[self.first_level:], scales=scales)
'''
self.fc = nn.Sequential(
nn.Conv2d(channels[self.first_level], classes, kernel_size=1,
stride=1, padding=0, bias=True)
)
'''
for head in self.heads:
classes = self.heads[head]
if head_conv > 0:
fc = nn.Sequential(
nn.Conv2d(channels[self.first_level], head_conv,
kernel_size=3, padding=1, bias=True),
nn.ReLU(inplace=True),
nn.Conv2d(head_conv, classes,
kernel_size=1, stride=1,
padding=0, bias=True))
if 'hm' in head:
fc[-1].bias.data.fill_(-2.19)
else:
fill_fc_weights(fc)
else:
fc = nn.Conv2d(channels[self.first_level], classes,
kernel_size=1, stride=1,
padding=0, bias=True)
if 'hm' in head:
fc.bias.data.fill_(-2.19)
else:
fill_fc_weights(fc)
self.__setattr__(head, fc)
'''
up_factor = 2 ** self.first_level
if up_factor > 1:
up = nn.ConvTranspose2d(classes, classes, up_factor * 2,
stride=up_factor, padding=up_factor // 2,
output_padding=0, groups=classes,
bias=False)
fill_up_weights(up)
up.weight.requires_grad = False
else:
up = Identity()
self.up = up
self.softmax = nn.LogSoftmax(dim=1)
for m in self.fc.modules():
if isinstance(m, nn.Conv2d):
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data.normal_(0, math.sqrt(2. / n))
elif isinstance(m, BatchNorm):
m.weight.data.fill_(1)
m.bias.data.zero_()
'''
def forward(self, x):
x = self.base(x)
x = self.dla_up(x[self.first_level:])
# x = self.fc(x)
# y = self.softmax(self.up(x))
ret = {}
for head in self.heads:
ret[head] = self.__getattr__(head)(x)
return [ret]
'''
def optim_parameters(self, memo=None):
for param in self.base.parameters():
yield param
for param in self.dla_up.parameters():
yield param
for param in self.fc.parameters():
yield param
'''
'''
def dla34up(classes, pretrained_base=None, **kwargs):
model = DLASeg('dla34', classes, pretrained_base=pretrained_base, **kwargs)
return model
def dla60up(classes, pretrained_base=None, **kwargs):
model = DLASeg('dla60', classes, pretrained_base=pretrained_base, **kwargs)
return model
def dla102up(classes, pretrained_base=None, **kwargs):
model = DLASeg('dla102', classes,
pretrained_base=pretrained_base, **kwargs)
return model
def dla169up(classes, pretrained_base=None, **kwargs):
model = DLASeg('dla169', classes,
pretrained_base=pretrained_base, **kwargs)
return model
'''
def get_pose_net(num_layers, heads, head_conv=256, down_ratio=4):
model = DLASeg('dla{}'.format(num_layers), heads,
pretrained=True,
down_ratio=down_ratio,
head_conv=head_conv)
return model
from .efficientdet import EfficientDet
def get_efficientdet(num_layers, cfg):
model = EfficientDet(intermediate_channels=cfg.MODEL.INTERMEDIATE_CHANNEL)
return model
import torch.nn as nn
import torch.nn.functional as F
from .conv_module import ConvModule
import torch
class BIFPN(nn.Module):
def __init__(self,
in_channels,
out_channels,
num_outs,
start_level=0,
end_level=-1,
stack=1,
add_extra_convs=False,
extra_convs_on_inputs=True,
relu_before_extra_convs=False,
no_norm_on_lateral=False,
conv_cfg=None,
norm_cfg=None,
activation=None):
super(BIFPN, self).__init__()
assert isinstance(in_channels, list)
self.in_channels = in_channels
self.out_channels = out_channels
self.num_ins = len(in_channels)
self.num_outs = num_outs
self.activation = activation
self.relu_before_extra_convs = relu_before_extra_convs
self.no_norm_on_lateral = no_norm_on_lateral
self.stack = stack
if end_level == -1:
self.backbone_end_level = self.num_ins
assert num_outs >= self.num_ins - start_level
else:
# if end_level < inputs, no extra level is allowed
self.backbone_end_level = end_level
assert end_level <= len(in_channels)
assert num_outs == end_level - start_level
self.start_level = start_level
self.end_level = end_level
self.add_extra_convs = add_extra_convs
self.extra_convs_on_inputs = extra_convs_on_inputs
self.lateral_convs = nn.ModuleList()
self.fpn_convs = nn.ModuleList()
self.stack_bifpn_convs = nn.ModuleList()
for i in range(self.start_level, self.backbone_end_level):
l_conv = ConvModule(
in_channels[i],
out_channels,
1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg if not self.no_norm_on_lateral else None,
activation=self.activation,
inplace=False)
self.lateral_convs.append(l_conv)
for ii in range(stack):
self.stack_bifpn_convs.append(BiFPNModule(channels=out_channels,
levels=self.backbone_end_level-self.start_level,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
activation=activation))
# add extra conv layers (e.g., RetinaNet)
extra_levels = num_outs - self.backbone_end_level + self.start_level
if add_extra_convs and extra_levels >= 1:
for i in range(extra_levels):
if i == 0 and self.extra_convs_on_inputs:
in_channels = self.in_channels[self.backbone_end_level - 1]
else:
in_channels = out_channels
extra_fpn_conv = ConvModule(
in_channels,
out_channels,
3,
stride=2,
padding=1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
activation=self.activation,
inplace=False)
self.fpn_convs.append(extra_fpn_conv)
# default init_weights for conv(msra) and norm in ConvModule
def init_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
xavier_init(m, distribution='uniform')
def forward(self, inputs):
assert len(inputs) == len(self.in_channels)
# build laterals
laterals = [
lateral_conv(inputs[i + self.start_level])
for i, lateral_conv in enumerate(self.lateral_convs)
]
# part 1: build top-down and down-top path with stack
used_backbone_levels = len(laterals)
for bifpn_module in self.stack_bifpn_convs:
laterals = bifpn_module(laterals)
outs = laterals
# part 2: add extra levels
if self.num_outs > len(outs):
# use max pool to get more levels on top of outputs
# (e.g., Faster R-CNN, Mask R-CNN)
if not self.add_extra_convs:
for i in range(self.num_outs - used_backbone_levels):
outs.append(F.max_pool2d(outs[-1], 1, stride=2))
# add conv layers on top of original feature maps (RetinaNet)
else:
if self.extra_convs_on_inputs:
orig = inputs[self.backbone_end_level - 1]
outs.append(self.fpn_convs[0](orig))
else:
outs.append(self.fpn_convs[0](outs[-1]))
for i in range(1, self.num_outs - used_backbone_levels):
if self.relu_before_extra_convs:
outs.append(self.fpn_convs[i](F.relu(outs[-1])))
else:
outs.append(self.fpn_convs[i](outs[-1]))
return tuple(outs)
class BiFPNModule(nn.Module):
def __init__(self,
channels,
levels,
init=0.5,
conv_cfg=None,
norm_cfg=None,
activation=None,
eps = 0.0001):
super(BiFPNModule, self).__init__()
self.activation = activation
self.eps = eps
self.levels = levels
self.bifpn_convs = nn.ModuleList()
# weighted
self.w1 = nn.Parameter(torch.Tensor(2, levels).fill_(init))
self.relu1 = nn.ReLU()
self.w2 = nn.Parameter(torch.Tensor(3, levels - 2).fill_(init))
self.relu2 = nn.ReLU()
for jj in range(2):
for i in range(self.levels-1): # 1,2,3
fpn_conv = nn.Sequential(
ConvModule(
channels,
channels,
3,
padding=1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
activation=self.activation,
inplace=False)
)
self.bifpn_convs.append(fpn_conv)
# default init_weights for conv(msra) and norm in ConvModule
def init_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
xavier_init(m, distribution='uniform')
def forward(self, inputs):
assert len(inputs) == self.levels
# build top-down and down-top path with stack
levels = self.levels
# w relu
w1 = self.relu1(self.w1)
w1 /= torch.sum(w1, dim=0) + self.eps # normalize
w2 = self.relu2(self.w2)
w2 /= torch.sum(w2, dim=0) + self.eps # normalize
# build top-down
idx_bifpn = 0
pathtd = inputs
inputs_clone = []
for in_tensor in inputs:
inputs_clone.append(in_tensor.clone())
for i in range(levels - 1, 0, -1):
pathtd[i - 1] = (w1[0, i-1]*pathtd[i - 1] + w1[1, i-1]*F.interpolate(pathtd[i], scale_factor=2, mode='nearest'))/(w1[0, i-1] + w1[1, i-1] + self.eps)
pathtd[i - 1] = self.bifpn_convs[idx_bifpn](pathtd[i - 1])
idx_bifpn = idx_bifpn + 1
# build down-top
for i in range(0, levels - 2, 1):
pathtd[i + 1] = (w2[0, i] * pathtd[i + 1] + w2[1, i] * F.max_pool2d(pathtd[i], kernel_size=2) + w2[2, i] * inputs_clone[i + 1])/(w2[0, i] + w2[1, i] + w2[2, i] + self.eps)
pathtd[i + 1] = self.bifpn_convs[idx_bifpn](pathtd[i + 1])
idx_bifpn = idx_bifpn + 1
pathtd[levels - 1] = (w1[0, levels-1] * pathtd[levels - 1] + w1[1, levels-1] * F.max_pool2d(pathtd[levels - 2], kernel_size=2))/(w1[0, levels-1] + w1[1, levels-1] + self.eps)
pathtd[levels - 1] = self.bifpn_convs[idx_bifpn](pathtd[levels - 1])
return pathtd
\ No newline at end of file
import warnings
import torch.nn as nn
import torch.nn.functional as F
def conv_ws_2d(input,
weight,
bias=None,
stride=1,
padding=0,
dilation=1,
groups=1,
eps=1e-5):
c_in = weight.size(0)
weight_flat = weight.view(c_in, -1)
mean = weight_flat.mean(dim=1, keepdim=True).view(c_in, 1, 1, 1)
std = weight_flat.std(dim=1, keepdim=True).view(c_in, 1, 1, 1)
weight = (weight - mean) / (std + eps)
return F.conv2d(input, weight, bias, stride, padding, dilation, groups)
class ConvWS2d(nn.Conv2d):
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride=1,
padding=0,
dilation=1,
groups=1,
bias=True,
eps=1e-5):
super(ConvWS2d, self).__init__(
in_channels,
out_channels,
kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias)
self.eps = eps
def forward(self, x):
return conv_ws_2d(x, self.weight, self.bias, self.stride, self.padding,
self.dilation, self.groups, self.eps)
conv_cfg = {
'Conv': nn.Conv2d,
'ConvWS': ConvWS2d,
# TODO: octave conv
}
def build_conv_layer(cfg, *args, **kwargs):
""" Build convolution layer
Args:
cfg (None or dict): cfg should contain:
type (str): identify conv layer type.
layer args: args needed to instantiate a conv layer.
Returns:
layer (nn.Module): created conv layer
"""
if cfg is None:
cfg_ = dict(type='Conv')
else:
assert isinstance(cfg, dict) and 'type' in cfg
cfg_ = cfg.copy()
layer_type = cfg_.pop('type')
if layer_type not in conv_cfg:
raise KeyError('Unrecognized norm type {}'.format(layer_type))
else:
conv_layer = conv_cfg[layer_type]
layer = conv_layer(*args, **kwargs, **cfg_)
return layer
norm_cfg = {
# format: layer_type: (abbreviation, module)
'BN': ('bn', nn.BatchNorm2d),
'SyncBN': ('bn', nn.SyncBatchNorm),
'GN': ('gn', nn.GroupNorm),
# and potentially 'SN'
}
def build_norm_layer(cfg, num_features, postfix=''):
""" Build normalization layer
Args:
cfg (dict): cfg should contain:
type (str): identify norm layer type.
layer args: args needed to instantiate a norm layer.
requires_grad (bool): [optional] whether stop gradient updates
num_features (int): number of channels from input.
postfix (int, str): appended into norm abbreviation to
create named layer.
Returns:
name (str): abbreviation + postfix
layer (nn.Module): created norm layer
"""
assert isinstance(cfg, dict) and 'type' in cfg
cfg_ = cfg.copy()
layer_type = cfg_.pop('type')
if layer_type not in norm_cfg:
raise KeyError('Unrecognized norm type {}'.format(layer_type))
else:
abbr, norm_layer = norm_cfg[layer_type]
if norm_layer is None:
raise NotImplementedError
assert isinstance(postfix, (int, str))
name = abbr + str(postfix)
requires_grad = cfg_.pop('requires_grad', True)
cfg_.setdefault('eps', 1e-5)
if layer_type != 'GN':
layer = norm_layer(num_features, **cfg_)
if layer_type == 'SyncBN':
layer._specify_ddp_gpu_num(1)
else:
assert 'num_groups' in cfg_
layer = norm_layer(num_channels=num_features, **cfg_)
for param in layer.parameters():
param.requires_grad = requires_grad
return name, layer
class ConvModule(nn.Module):
"""A conv block that contains conv/norm/activation layers.
Args:
in_channels (int): Same as nn.Conv2d.
out_channels (int): Same as nn.Conv2d.
kernel_size (int or tuple[int]): Same as nn.Conv2d.
stride (int or tuple[int]): Same as nn.Conv2d.
padding (int or tuple[int]): Same as nn.Conv2d.
dilation (int or tuple[int]): Same as nn.Conv2d.
groups (int): Same as nn.Conv2d.
bias (bool or str): If specified as `auto`, it will be decided by the
norm_cfg. Bias will be set as True if norm_cfg is None, otherwise
False.
conv_cfg (dict): Config dict for convolution layer.
norm_cfg (dict): Config dict for normalization layer.
activation (str or None): Activation type, "ReLU" by default.
inplace (bool): Whether to use inplace mode for activation.
order (tuple[str]): The order of conv/norm/activation layers. It is a
sequence of "conv", "norm" and "act". Examples are
("conv", "norm", "act") and ("act", "conv", "norm").
"""
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride=1,
padding=0,
dilation=1,
groups=1,
bias='auto',
conv_cfg=None,
norm_cfg=None,
activation='relu',
inplace=True,
order=('conv', 'norm', 'act')):
super(ConvModule, self).__init__()
assert conv_cfg is None or isinstance(conv_cfg, dict)
assert norm_cfg is None or isinstance(norm_cfg, dict)
self.conv_cfg = conv_cfg
self.norm_cfg = norm_cfg
self.activation = activation
self.inplace = inplace
self.order = order
assert isinstance(self.order, tuple) and len(self.order) == 3
assert set(order) == set(['conv', 'norm', 'act'])
self.with_norm = norm_cfg is not None
self.with_activatation = activation is not None
# if the conv layer is before a norm layer, bias is unnecessary.
if bias == 'auto':
bias = False if self.with_norm else True
self.with_bias = bias
if self.with_norm and self.with_bias:
warnings.warn('ConvModule has norm and bias at the same time')
# build convolution layer
self.conv = build_conv_layer(
conv_cfg,
in_channels,
out_channels,
kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=groups,
bias=bias)
# export the attributes of self.conv to a higher level for convenience
self.in_channels = self.conv.in_channels
self.out_channels = self.conv.out_channels
self.kernel_size = self.conv.kernel_size
self.stride = self.conv.stride
self.padding = self.conv.padding
self.dilation = self.conv.dilation
self.transposed = self.conv.transposed
self.output_padding = self.conv.output_padding
self.groups = self.conv.groups
# build normalization layers
if self.with_norm:
# norm layer is after conv layer
if order.index('norm') > order.index('conv'):
norm_channels = out_channels
else:
norm_channels = in_channels
self.norm_name, norm = build_norm_layer(norm_cfg, norm_channels)
self.add_module(self.norm_name, norm)
# build activation layer
if self.with_activatation:
# TODO: introduce `act_cfg` and supports more activation layers
if self.activation not in ['relu']:
raise ValueError('{} is currently not supported.'.format(
self.activation))
if self.activation == 'relu':
self.activate = nn.ReLU(inplace=inplace)
@property
def norm(self):
return getattr(self, self.norm_name)
def forward(self, x, activate=True, norm=True):
for layer in self.order:
if layer == 'conv':
x = self.conv(x)
elif layer == 'norm' and norm and self.with_norm:
x = self.norm(x)
elif layer == 'act' and activate and self.with_activatation:
x = self.activate(x)
return x
import numpy as np
import torch.nn as nn
def xavier_init(module, gain=1, bias=0, distribution='normal'):
assert distribution in ['uniform', 'normal']
if distribution == 'uniform':
nn.init.xavier_uniform_(module.weight, gain=gain)
else:
nn.init.xavier_normal_(module.weight, gain=gain)
if hasattr(module, 'bias'):
nn.init.constant_(module.bias, bias)
def normal_init(module, mean=0, std=1, bias=0):
nn.init.normal_(module.weight, mean, std)
if hasattr(module, 'bias'):
nn.init.constant_(module.bias, bias)
def uniform_init(module, a=0, b=1, bias=0):
nn.init.uniform_(module.weight, a, b)
if hasattr(module, 'bias'):
nn.init.constant_(module.bias, bias)
def kaiming_init(module,
mode='fan_out',
nonlinearity='relu',
bias=0,
distribution='normal'):
assert distribution in ['uniform', 'normal']
if distribution == 'uniform':
nn.init.kaiming_uniform_(
module.weight, mode=mode, nonlinearity=nonlinearity)
else:
nn.init.kaiming_normal_(
module.weight, mode=mode, nonlinearity=nonlinearity)
if hasattr(module, 'bias'):
nn.init.constant_(module.bias, bias)
def bias_init_with_prob(prior_prob):
""" initialize conv/fc bias value according to giving probablity"""
bias_init = float(-np.log((1 - prior_prob) / prior_prob))
return bias_init
\ No newline at end of file
import torch
import torch.nn as nn
import math
from .efficientnet import EfficientNet
from .bifpn import BIFPN
from .retinahead import RetinaHead
from torchvision.ops import nms
import torch.nn.functional as F
MODEL_MAP = {
'efficientdet-d0': 'efficientnet-b0',
'efficientdet-d1': 'efficientnet-b1',
'efficientdet-d2': 'efficientnet-b2',
'efficientdet-d3': 'efficientnet-b3',
'efficientdet-d4': 'efficientnet-b4',
'efficientdet-d5': 'efficientnet-b5',
'efficientdet-d6': 'efficientnet-b6',
'efficientdet-d7': 'efficientnet-b6',
}
class EfficientDet(nn.Module):
def __init__(self,
intermediate_channels,
network = 'efficientdet-d1',
D_bifpn=3,
W_bifpn=32,
D_class=3,
scale_ratios = [0.5, 1, 2, 4, 8,16,32],
):
super(EfficientDet, self).__init__()
self.backbone = EfficientNet.from_pretrained(MODEL_MAP[network])
self.neck = BIFPN(in_channels=self.backbone.get_list_features(),
out_channels=W_bifpn,
stack=D_bifpn,
num_outs=7)
self.bbox_head = RetinaHead(num_classes = intermediate_channels,
in_channels = W_bifpn)
self.scale_ratios = scale_ratios
for m in self.modules():
if isinstance(m, nn.Conv2d):
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data.normal_(0, math.sqrt(2. / n))
elif isinstance(m, nn.BatchNorm2d):
m.weight.data.fill_(1)
m.bias.data.zero_()
self.freeze_bn()
def forward(self, inputs):
x = self.extract_feat(inputs)
outs = self.bbox_head(x)
return outs[0][1]
def freeze_bn(self):
'''Freeze BatchNorm layers.'''
for layer in self.modules():
if isinstance(layer, nn.BatchNorm2d):
layer.eval()
def extract_feat(self, img):
"""
Directly extract features from the backbone+neck
"""
x = self.backbone(img)
x = self.neck(x)
return x
import torch
from torch import nn
from torch.nn import functional as F
from .utils import (
round_filters,
round_repeats,
drop_connect,
get_same_padding_conv2d,
get_model_params,
efficientnet_params,
load_pretrained_weights,
Swish,
MemoryEfficientSwish,
)
class MBConvBlock(nn.Module):
"""
Mobile Inverted Residual Bottleneck Block
Args:
block_args (namedtuple): BlockArgs, see above
global_params (namedtuple): GlobalParam, see above
Attributes:
has_se (bool): Whether the block contains a Squeeze and Excitation layer.
"""
def __init__(self, block_args, global_params):
super().__init__()
self._block_args = block_args
self._bn_mom = 1 - global_params.batch_norm_momentum
self._bn_eps = global_params.batch_norm_epsilon
self.has_se = (self._block_args.se_ratio is not None) and (0 < self._block_args.se_ratio <= 1)
self.id_skip = block_args.id_skip # skip connection and drop connect
# Get static or dynamic convolution depending on image size
Conv2d = get_same_padding_conv2d(image_size=global_params.image_size)
# Expansion phase
inp = self._block_args.input_filters # number of input channels
oup = self._block_args.input_filters * self._block_args.expand_ratio # number of output channels
if self._block_args.expand_ratio != 1:
self._expand_conv = Conv2d(in_channels=inp, out_channels=oup, kernel_size=1, bias=False)
self._bn0 = nn.BatchNorm2d(num_features=oup, momentum=self._bn_mom, eps=self._bn_eps)
# Depthwise convolution phase
k = self._block_args.kernel_size
s = self._block_args.stride
self._depthwise_conv = Conv2d(
in_channels=oup, out_channels=oup, groups=oup, # groups makes it depthwise
kernel_size=k, stride=s, bias=False)
self._bn1 = nn.BatchNorm2d(num_features=oup, momentum=self._bn_mom, eps=self._bn_eps)
# Squeeze and Excitation layer, if desired
if self.has_se:
num_squeezed_channels = max(1, int(self._block_args.input_filters * self._block_args.se_ratio))
self._se_reduce = Conv2d(in_channels=oup, out_channels=num_squeezed_channels, kernel_size=1)
self._se_expand = Conv2d(in_channels=num_squeezed_channels, out_channels=oup, kernel_size=1)
# Output phase
final_oup = self._block_args.output_filters
self._project_conv = Conv2d(in_channels=oup, out_channels=final_oup, kernel_size=1, bias=False)
self._bn2 = nn.BatchNorm2d(num_features=final_oup, momentum=self._bn_mom, eps=self._bn_eps)
self._swish = MemoryEfficientSwish()
def forward(self, inputs, drop_connect_rate=None):
"""
:param inputs: input tensor
:param drop_connect_rate: drop connect rate (float, between 0 and 1)
:return: output of block
"""
# Expansion and Depthwise Convolution
x = inputs
if self._block_args.expand_ratio != 1:
x = self._swish(self._bn0(self._expand_conv(inputs)))
x = self._swish(self._bn1(self._depthwise_conv(x)))
# Squeeze and Excitation
if self.has_se:
x_squeezed = F.adaptive_avg_pool2d(x, 1)
x_squeezed = self._se_expand(self._swish(self._se_reduce(x_squeezed)))
x = torch.sigmoid(x_squeezed) * x
x = self._bn2(self._project_conv(x))
# Skip connection and drop connect
input_filters, output_filters = self._block_args.input_filters, self._block_args.output_filters
if self.id_skip and self._block_args.stride == 1 and input_filters == output_filters:
if drop_connect_rate:
x = drop_connect(x, p=drop_connect_rate, training=self.training)
x = x + inputs # skip connection
return x
def set_swish(self, memory_efficient=True):
"""Sets swish function as memory efficient (for training) or standard (for export)"""
self._swish = MemoryEfficientSwish() if memory_efficient else Swish()
class EfficientNet(nn.Module):
"""
An EfficientNet model. Most easily loaded with the .from_name or .from_pretrained methods
Args:
blocks_args (list): A list of BlockArgs to construct blocks
global_params (namedtuple): A set of GlobalParams shared between blocks
Example:
model = EfficientNet.from_pretrained('efficientnet-b0')
"""
def __init__(self, blocks_args=None, global_params=None):
super().__init__()
assert isinstance(blocks_args, list), 'blocks_args should be a list'
assert len(blocks_args) > 0, 'block args must be greater than 0'
self._global_params = global_params
self._blocks_args = blocks_args
# Get static or dynamic convolution depending on image size
Conv2d = get_same_padding_conv2d(image_size=global_params.image_size)
# Batch norm parameters
bn_mom = 1 - self._global_params.batch_norm_momentum
bn_eps = self._global_params.batch_norm_epsilon
# Stem
in_channels = 3 # rgb
out_channels = round_filters(32, self._global_params) # number of output channels
self._conv_stem = Conv2d(in_channels, out_channels, kernel_size=3, stride=2, bias=False)
self._bn0 = nn.BatchNorm2d(num_features=out_channels, momentum=bn_mom, eps=bn_eps)
# Build blocks
self._blocks = nn.ModuleList([])
for i in range(len(self._blocks_args)):
# Update block input and output filters based on depth multiplier.
self._blocks_args[i] = self._blocks_args[i]._replace(
input_filters=round_filters(self._blocks_args[i].input_filters, self._global_params),
output_filters=round_filters(self._blocks_args[i].output_filters, self._global_params),
num_repeat=round_repeats(self._blocks_args[i].num_repeat, self._global_params)
)
# The first block needs to take care of stride and filter size increase.
self._blocks.append(MBConvBlock(self._blocks_args[i], self._global_params))
if self._blocks_args[i].num_repeat > 1:
self._blocks_args[i] = self._blocks_args[i]._replace(input_filters=self._blocks_args[i].output_filters, stride=1)
for _ in range(self._blocks_args[i].num_repeat - 1):
self._blocks.append(MBConvBlock(self._blocks_args[i], self._global_params))
# Head'efficientdet-d0': 'efficientnet-b0',
in_channels = self._blocks_args[len(self._blocks_args)-1].output_filters # output of final block
out_channels = round_filters(1280, self._global_params)
self._conv_head = Conv2d(in_channels, out_channels, kernel_size=1, bias=False)
self._bn1 = nn.BatchNorm2d(num_features=out_channels, momentum=bn_mom, eps=bn_eps)
# Final linear layer
self._avg_pooling = nn.AdaptiveAvgPool2d(1)
self._dropout = nn.Dropout(self._global_params.dropout_rate)
self._fc = nn.Linear(out_channels, self._global_params.num_classes)
self._swish = MemoryEfficientSwish()
def set_swish(self, memory_efficient=True):
"""Sets swish function as memory efficient (for training) or standard (for export)"""
self._swish = MemoryEfficientSwish() if memory_efficient else Swish()
for block in self._blocks:
block.set_swish(memory_efficient)
def extract_features(self, inputs):
""" Returns output of the final convolution layer """
# Stem
x = self._swish(self._bn0(self._conv_stem(inputs)))
P = []
index = 0
num_repeat = 0
# Blocks
for idx, block in enumerate(self._blocks):
drop_connect_rate = self._global_params.drop_connect_rate
if drop_connect_rate:
drop_connect_rate *= float(idx) / len(self._blocks)
x = block(x, drop_connect_rate=drop_connect_rate)
num_repeat = num_repeat + 1
if(num_repeat == self._blocks_args[index].num_repeat):
num_repeat = 0
index = index + 1
P.append(x)
return P
def forward(self, inputs):
""" Calls extract_features to extract features, applies final linear layer, and returns logits. """
# Convolution layers
P = self.extract_features(inputs)
return P
@classmethod
def from_name(cls, model_name, override_params=None):
cls._check_model_name_is_valid(model_name)
blocks_args, global_params = get_model_params(model_name, override_params)
return cls(blocks_args, global_params)
@classmethod
def from_pretrained(cls, model_name, num_classes=1000, in_channels = 3):
model = cls.from_name(model_name, override_params={'num_classes': num_classes})
load_pretrained_weights(model, model_name, load_fc=(num_classes == 1000))
if in_channels != 3:
Conv2d = get_same_padding_conv2d(image_size = model._global_params.image_size)
out_channels = round_filters(32, model._global_params)
model._conv_stem = Conv2d(in_channels, out_channels, kernel_size=3, stride=2, bias=False)
return model
@classmethod
def from_pretrained(cls, model_name, num_classes=1000):
model = cls.from_name(model_name, override_params={'num_classes': num_classes})
load_pretrained_weights(model, model_name, load_fc=(num_classes == 1000))
return model
@classmethod
def get_image_size(cls, model_name):
cls._check_model_name_is_valid(model_name)
_, _, res, _ = efficientnet_params(model_name)
return res
@classmethod
def _check_model_name_is_valid(cls, model_name, also_need_pretrained_weights=False):
""" Validates model name. None that pretrained weights are only available for
the first four models (efficientnet-b{i} for i in 0,1,2,3) at the moment. """
num_models = 4 if also_need_pretrained_weights else 8
valid_models = ['efficientnet-b'+str(i) for i in range(num_models)]
if model_name not in valid_models:
raise ValueError('model_name should be one of: ' + ', '.join(valid_models))
def get_list_features(self):
list_feature = []
for idx in range(len(self._blocks_args)):
list_feature.append(self._blocks_args[idx].output_filters)
return list_feature
if __name__=='__main__':
model = EfficientNet.from_pretrained('efficientnet-b0')
inputs = torch.randn(4, 3, 640, 640)
P = model(inputs)
for idx, p in enumerate(P):
print('P{}: {}'.format(idx, p.size()))
# print('model: ', model)
import numpy as np
import torch
import torch.nn as nn
class BBoxTransform(nn.Module):
def __init__(self, mean=None, std=None):
super(BBoxTransform, self).__init__()
if mean is None:
self.mean = torch.from_numpy(np.array([0, 0, 0, 0]).astype(np.float32))
else:
self.mean = mean
if std is None:
self.std = torch.from_numpy(np.array([0.1, 0.1, 0.2, 0.2]).astype(np.float32))
else:
self.std = std
def forward(self, boxes, deltas):
widths = boxes[:, :, 2] - boxes[:, :, 0]
heights = boxes[:, :, 3] - boxes[:, :, 1]
ctr_x = boxes[:, :, 0] + 0.5 * widths
ctr_y = boxes[:, :, 1] + 0.5 * heights
dx = deltas[:, :, 0] * self.std[0] + self.mean[0]
dy = deltas[:, :, 1] * self.std[1] + self.mean[1]
dw = deltas[:, :, 2] * self.std[2] + self.mean[2]
dh = deltas[:, :, 3] * self.std[3] + self.mean[3]
pred_ctr_x = ctr_x + dx * widths
pred_ctr_y = ctr_y + dy * heights
pred_w = torch.exp(dw) * widths
pred_h = torch.exp(dh) * heights
pred_boxes_x1 = pred_ctr_x - 0.5 * pred_w
pred_boxes_y1 = pred_ctr_y - 0.5 * pred_h
pred_boxes_x2 = pred_ctr_x + 0.5 * pred_w
pred_boxes_y2 = pred_ctr_y + 0.5 * pred_h
pred_boxes = torch.stack([pred_boxes_x1, pred_boxes_y1, pred_boxes_x2, pred_boxes_y2], dim=2)
return pred_boxes
class ClipBoxes(nn.Module):
def __init__(self, width=None, height=None):
super(ClipBoxes, self).__init__()
def forward(self, boxes, img):
batch_size, num_channels, height, width = img.shape
boxes[:, :, 0] = torch.clamp(boxes[:, :, 0], min=0)
boxes[:, :, 1] = torch.clamp(boxes[:, :, 1], min=0)
boxes[:, :, 2] = torch.clamp(boxes[:, :, 2], max=width)
boxes[:, :, 3] = torch.clamp(boxes[:, :, 3], max=height)
return boxes
class RegressionModel(nn.Module):
def __init__(self, num_features_in, num_anchors=9, feature_size=256):
super(RegressionModel, self).__init__()
self.conv1 = nn.Conv2d(num_features_in, feature_size, kernel_size=3, padding=1)
self.act1 = nn.ReLU()
self.conv2 = nn.Conv2d(feature_size, feature_size, kernel_size=3, padding=1)
self.act2 = nn.ReLU()
self.conv3 = nn.Conv2d(feature_size, feature_size, kernel_size=3, padding=1)
self.act3 = nn.ReLU()
self.conv4 = nn.Conv2d(feature_size, feature_size, kernel_size=3, padding=1)
self.act4 = nn.ReLU()
self.output = nn.Conv2d(feature_size, num_anchors*4, kernel_size=3, padding=1)
def forward(self, x):
out = self.conv1(x)
out = self.act1(out)
out = self.conv2(out)
out = self.act2(out)
out = self.conv3(out)
out = self.act3(out)
out = self.conv4(out)
out = self.act4(out)
out = self.output(out)
# out is B x C x W x H, with C = 4*num_anchors
out = out.permute(0, 2, 3, 1)
return out.contiguous().view(out.shape[0], -1, 4)
class ClassificationModel(nn.Module):
def __init__(self, num_features_in, num_anchors=9, num_classes=80, prior=0.01, feature_size=256):
super(ClassificationModel, self).__init__()
self.num_classes = num_classes
self.num_anchors = num_anchors
self.conv1 = nn.Conv2d(num_features_in, feature_size, kernel_size=3, padding=1)
self.act1 = nn.ReLU()
self.conv2 = nn.Conv2d(feature_size, feature_size, kernel_size=3, padding=1)
self.act2 = nn.ReLU()
self.conv3 = nn.Conv2d(feature_size, feature_size, kernel_size=3, padding=1)
self.act3 = nn.ReLU()
self.conv4 = nn.Conv2d(feature_size, feature_size, kernel_size=3, padding=1)
self.act4 = nn.ReLU()
self.output = nn.Conv2d(feature_size, num_anchors*num_classes, kernel_size=3, padding=1)
self.output_act = nn.Sigmoid()
def forward(self, x):
out = self.conv1(x)
out = self.act1(out)
out = self.conv2(out)
out = self.act2(out)
out = self.conv3(out)
out = self.act3(out)
out = self.conv4(out)
out = self.act4(out)
out = self.output(out)
out = self.output_act(out)
# out is B x C x W x H, with C = n_classes + n_anchors
out1 = out.permute(0, 2, 3, 1)
batch_size, width, height, channels = out1.shape
out2 = out1.view(batch_size, width, height, self.num_anchors, self.num_classes)
return out2.contiguous().view(x.shape[0], -1, self.num_classes)
class Anchors(nn.Module):
def __init__(self, pyramid_levels=None, strides=None, sizes=None, ratios=None, scales=None):
super(Anchors, self).__init__()
if pyramid_levels is None:
self.pyramid_levels = [3, 4, 5, 6, 7]
if strides is None:
self.strides = [2 ** x for x in self.pyramid_levels]
if sizes is None:
self.sizes = [2 ** (x + 2) for x in self.pyramid_levels]
if ratios is None:
self.ratios = np.array([0.5, 1, 2])
if scales is None:
self.scales = np.array([2 ** 0, 2 ** (1.0 / 3.0), 2 ** (2.0 / 3.0)])
def forward(self, image):
image_shape = image.shape[2:]
image_shape = np.array(image_shape)
image_shapes = [(image_shape + 2 ** x - 1) // (2 ** x) for x in self.pyramid_levels]
# compute anchors over all pyramid levels
all_anchors = np.zeros((0, 4)).astype(np.float32)
for idx, p in enumerate(self.pyramid_levels):
anchors = generate_anchors(base_size=self.sizes[idx], ratios=self.ratios, scales=self.scales)
shifted_anchors = shift(image_shapes[idx], self.strides[idx], anchors)
all_anchors = np.append(all_anchors, shifted_anchors, axis=0)
all_anchors = np.expand_dims(all_anchors, axis=0)
return torch.from_numpy(all_anchors.astype(np.float32)).to(image.device)
def generate_anchors(base_size=16, ratios=None, scales=None):
"""
Generate anchor (reference) windows by enumerating aspect ratios X
scales w.r.t. a reference window.
"""
if ratios is None:
ratios = np.array([0.5, 1, 2])
if scales is None:
scales = np.array([2 ** 0, 2 ** (1.0 / 3.0), 2 ** (2.0 / 3.0)])
num_anchors = len(ratios) * len(scales)
# initialize output anchors
anchors = np.zeros((num_anchors, 4))
# scale base_size
anchors[:, 2:] = base_size * np.tile(scales, (2, len(ratios))).T
# compute areas of anchors
areas = anchors[:, 2] * anchors[:, 3]
# correct for ratios
anchors[:, 2] = np.sqrt(areas / np.repeat(ratios, len(scales)))
anchors[:, 3] = anchors[:, 2] * np.repeat(ratios, len(scales))
# transform from (x_ctr, y_ctr, w, h) -> (x1, y1, x2, y2)
anchors[:, 0::2] -= np.tile(anchors[:, 2] * 0.5, (2, 1)).T
anchors[:, 1::2] -= np.tile(anchors[:, 3] * 0.5, (2, 1)).T
return anchors
def compute_shape(image_shape, pyramid_levels):
"""Compute shapes based on pyramid levels.
:param image_shape:
:param pyramid_levels:
:return:
"""
image_shape = np.array(image_shape[:2])
image_shapes = [(image_shape + 2 ** x - 1) // (2 ** x) for x in pyramid_levels]
return image_shapes
def anchors_for_shape(
image_shape,
pyramid_levels=None,
ratios=None,
scales=None,
strides=None,
sizes=None,
shapes_callback=None,
):
image_shapes = compute_shape(image_shape, pyramid_levels)
# compute anchors over all pyramid levels
all_anchors = np.zeros((0, 4))
for idx, p in enumerate(pyramid_levels):
anchors = generate_anchors(base_size=sizes[idx], ratios=ratios, scales=scales)
shifted_anchors = shift(image_shapes[idx], strides[idx], anchors)
all_anchors = np.append(all_anchors, shifted_anchors, axis=0)
return all_anchors
def shift(shape, stride, anchors):
shift_x = (np.arange(0, shape[1]) + 0.5) * stride
shift_y = (np.arange(0, shape[0]) + 0.5) * stride
shift_x, shift_y = np.meshgrid(shift_x, shift_y)
shifts = np.vstack((
shift_x.ravel(), shift_y.ravel(),
shift_x.ravel(), shift_y.ravel()
)).transpose()
# add A anchors (1, A, 4) to
# cell K shifts (K, 1, 4) to get
# shift anchors (K, A, 4)
# reshape to (K*A, 4) shifted anchors
A = anchors.shape[0]
K = shifts.shape[0]
all_anchors = (anchors.reshape((1, A, 4)) + shifts.reshape((1, K, 4)).transpose((1, 0, 2)))
all_anchors = all_anchors.reshape((K * A, 4))
return all_anchors
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment