Commit c27fee37 authored by dengjb's avatar dengjb
Browse files

update

parent 420f8331
Pipeline #2788 canceled with stages
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from .data_aug import point_range_filter, data_augment
from .kitti import Kitti
from .dataloader import get_dataloader
import copy
import numba
import numpy as np
import os
import pdb
from pointpillars.utils import bbox3d2bevcorners, box_collision_test, read_points, \
remove_pts_in_bboxes, limit_period
def dbsample(CLASSES, data_root, data_dict, db_sampler, sample_groups):
'''
CLASSES: dict(Pedestrian=0, Cyclist=1, Car=2)
data_root: str, data root
data_dict: dict(pts, gt_bboxes_3d, gt_labels, gt_names, difficulty)
db_infos: dict(Pedestrian, Cyclist, Car, ...)
return: data_dict
'''
pts, gt_bboxes_3d = data_dict['pts'], data_dict['gt_bboxes_3d']
gt_labels, gt_names = data_dict['gt_labels'], data_dict['gt_names']
gt_difficulty = data_dict['difficulty']
image_info, calib_info = data_dict['image_info'], data_dict['calib_info']
sampled_pts, sampled_names, sampled_labels = [], [], []
sampled_bboxes, sampled_difficulty = [], []
avoid_coll_boxes = copy.deepcopy(gt_bboxes_3d)
for name, v in sample_groups.items():
# 1. calculate sample numbers
sampled_num = v - np.sum(gt_names == name)
if sampled_num <= 0:
continue
# 2. sample databases bboxes
sampled_cls_list = db_sampler[name].sample(sampled_num)
sampled_cls_bboxes = np.array([item['box3d_lidar'] for item in sampled_cls_list], dtype=np.float32)
# 3. box_collision_test
avoid_coll_boxes_bv_corners = bbox3d2bevcorners(avoid_coll_boxes)
sampled_cls_bboxes_bv_corners = bbox3d2bevcorners(sampled_cls_bboxes)
coll_query_matrix = np.concatenate([avoid_coll_boxes_bv_corners, sampled_cls_bboxes_bv_corners], axis=0)
coll_mat = box_collision_test(coll_query_matrix, coll_query_matrix)
n_gt, tmp_bboxes = len(avoid_coll_boxes_bv_corners), []
for i in range(n_gt, len(coll_mat)):
if any(coll_mat[i]):
coll_mat[i] = False
coll_mat[:, i] = False
else:
cur_sample = sampled_cls_list[i - n_gt]
pt_path = os.path.join(data_root, cur_sample['path'])
sampled_pts_cur = read_points(pt_path)
sampled_pts_cur[:, :3] += cur_sample['box3d_lidar'][:3]
sampled_pts.append(sampled_pts_cur)
sampled_names.append(cur_sample['name'])
sampled_labels.append(CLASSES[cur_sample['name']])
sampled_bboxes.append(cur_sample['box3d_lidar'])
tmp_bboxes.append(cur_sample['box3d_lidar'])
sampled_difficulty.append(cur_sample['difficulty'])
if len(tmp_bboxes) == 0:
tmp_bboxes = np.array(tmp_bboxes).reshape(-1, 7)
else:
tmp_bboxes = np.array(tmp_bboxes)
avoid_coll_boxes = np.concatenate([avoid_coll_boxes, tmp_bboxes], axis=0)
# merge sampled database
# remove raw points in sampled_bboxes firstly
pts = remove_pts_in_bboxes(pts, np.stack(sampled_bboxes, axis=0))
# pts = np.concatenate([pts, np.concatenate(sampled_pts, axis=0)], axis=0)
pts = np.concatenate([np.concatenate(sampled_pts, axis=0), pts], axis=0)
gt_bboxes_3d = avoid_coll_boxes.astype(np.float32)
gt_labels = np.concatenate([gt_labels, np.array(sampled_labels)], axis=0)
gt_names = np.concatenate([gt_names, np.array(sampled_names)], axis=0)
difficulty = np.concatenate([gt_difficulty, np.array(sampled_difficulty)], axis=0)
data_dict = {
'pts': pts,
'gt_bboxes_3d': gt_bboxes_3d,
'gt_labels': gt_labels,
'gt_names': gt_names,
'difficulty': difficulty,
'image_info': image_info,
'calib_info': calib_info
}
return data_dict
@numba.jit(nopython=True)
def object_noise_core(pts, gt_bboxes_3d, bev_corners, trans_vec, rot_angle, rot_mat, masks):
'''
pts: (N, 4)
gt_bboxes_3d: (n_bbox, 7)
bev_corners: ((n_bbox, 4, 2))
trans_vec: (n_bbox, num_try, 3)
rot_mat: (n_bbox, num_try, 2, 2)
masks: (N, n_bbox), bool
return: gt_bboxes_3d, pts
'''
# 1. select the noise of num_try for each bbox under the collision test
n_bbox, num_try = trans_vec.shape[:2]
# succ_mask: (n_bbox, ), whether each bbox can be added noise successfully. -1 denotes failure.
succ_mask = -np.ones((n_bbox, ), dtype=np.int_)
for i in range(n_bbox):
for j in range(num_try):
cur_bbox = bev_corners[i] - np.expand_dims(gt_bboxes_3d[i, :2], 0) # (4, 2) - (1, 2) -> (4, 2)
rot = np.zeros((2, 2), dtype=np.float32)
rot[:] = rot_mat[i, j] # (2, 2)
trans = trans_vec[i, j] # (3, )
cur_bbox = cur_bbox @ rot
cur_bbox += gt_bboxes_3d[i, :2]
cur_bbox += np.expand_dims(trans[:2], 0) # (4, 2)
coll_mat = box_collision_test(np.expand_dims(cur_bbox, 0), bev_corners)
coll_mat[0, i] = False
if coll_mat.any():
continue
else:
bev_corners[i] = cur_bbox # update the bev_corners when adding noise succseefully.
succ_mask[i] = j
break
# 2. points and bboxes noise
visit = {}
for i in range(n_bbox):
jj = succ_mask[i]
if jj == -1:
continue
cur_trans, cur_angle = trans_vec[i, jj], rot_angle[i, jj]
cur_rot_mat = np.zeros((2, 2), dtype=np.float32)
cur_rot_mat[:] = rot_mat[i, jj]
for k in range(len(pts)):
if masks[k][i] and k not in visit:
cur_pt = pts[k] # (4, )
cur_pt_xyz = np.zeros((1, 3), dtype=np.float32)
cur_pt_xyz[0] = cur_pt[:3] - gt_bboxes_3d[i][:3]
tmp_cur_pt_xy = np.zeros((1, 2), dtype=np.float32)
tmp_cur_pt_xy[:] = cur_pt_xyz[:, :2]
cur_pt_xyz[:, :2] = tmp_cur_pt_xy @ cur_rot_mat # (1, 2)
cur_pt_xyz[0] = cur_pt_xyz[0] + gt_bboxes_3d[i][:3]
cur_pt_xyz[0] = cur_pt_xyz[0] + cur_trans[:3]
cur_pt[:3] = cur_pt_xyz[0]
visit[k] = 1
gt_bboxes_3d[i, :3] += cur_trans[:3]
gt_bboxes_3d[i, 6] += cur_angle
return gt_bboxes_3d, pts
def object_noise(data_dict, num_try, translation_std, rot_range):
'''
data_dict: dict(pts, gt_bboxes_3d, gt_labels, gt_names, difficulty)
num_try: int, 100
translation_std: shape=[3, ]
rot_range: shape=[2, ]
return: data_dict
'''
pts, gt_bboxes_3d = data_dict['pts'], data_dict['gt_bboxes_3d']
n_bbox = len(gt_bboxes_3d)
# 1. generate rotation vectors and rotation matrices
trans_vec = np.random.normal(scale=translation_std, size=(n_bbox, num_try, 3)).astype(np.float32)
rot_angle = np.random.uniform(rot_range[0], rot_range[1], size=(n_bbox, num_try)).astype(np.float32)
rot_cos, rot_sin = np.cos(rot_angle), np.sin(rot_angle)
# in fact, - rot_angle
rot_mat = np.array([[rot_cos, rot_sin],
[-rot_sin, rot_cos]]) # (2, 2, n_bbox, num_try)
rot_mat = np.transpose(rot_mat, (2, 3, 1, 0)) # (n_bbox, num_try, 2, 2)
# 2. generate noise for each bbox and the points inside the bbox.
bev_corners = bbox3d2bevcorners(gt_bboxes_3d) # (n_bbox, 4, 2) # for collision test
masks = remove_pts_in_bboxes(pts, gt_bboxes_3d, rm=False) # identify which point should be added noise
gt_bboxes_3d, pts = object_noise_core(pts=pts,
gt_bboxes_3d=gt_bboxes_3d,
bev_corners=bev_corners,
trans_vec=trans_vec,
rot_angle=rot_angle,
rot_mat=rot_mat,
masks=masks)
data_dict.update({'gt_bboxes_3d': gt_bboxes_3d})
data_dict.update({'pts': pts})
return data_dict
def random_flip(data_dict, random_flip_ratio):
'''
data_dict: dict(pts, gt_bboxes_3d, gt_labels, gt_names, difficulty)
random_flip_ratio: float, 0-1
return: data_dict
'''
random_flip_state = np.random.choice([True, False], p=[random_flip_ratio, 1-random_flip_ratio])
if random_flip_state:
pts, gt_bboxes_3d = data_dict['pts'], data_dict['gt_bboxes_3d']
pts[:, 1] = -pts[:, 1]
gt_bboxes_3d[:, 1] = -gt_bboxes_3d[:, 1]
gt_bboxes_3d[:, 6] = -gt_bboxes_3d[:, 6] + np.pi
data_dict.update({'gt_bboxes_3d': gt_bboxes_3d})
data_dict.update({'pts': pts})
return data_dict
def global_rot_scale_trans(data_dict, rot_range, scale_ratio_range, translation_std):
'''
data_dict: dict(pts, gt_bboxes_3d, gt_labels, gt_names, difficulty)
rot_range: [a, b]
scale_ratio_range: [c, d]
translation_std: [e, f, g]
return: data_dict
'''
pts, gt_bboxes_3d = data_dict['pts'], data_dict['gt_bboxes_3d']
# 1. rotation
rot_angle = np.random.uniform(rot_range[0], rot_range[1])
rot_cos, rot_sin = np.cos(rot_angle), np.sin(rot_angle)
# in fact, - rot_angle
rot_mat = np.array([[rot_cos, rot_sin],
[-rot_sin, rot_cos]]) # (2, 2)
# 1.1 bbox rotation
gt_bboxes_3d[:, :2] = gt_bboxes_3d[:, :2] @ rot_mat.T
gt_bboxes_3d[:, 6] += rot_angle
# 1.2 point rotation
pts[:, :2] = pts[:, :2] @ rot_mat.T
# 2. scaling
scale_fator = np.random.uniform(scale_ratio_range[0], scale_ratio_range[1])
gt_bboxes_3d[:, :6] *= scale_fator
pts[:, :3] *= scale_fator
# 3. translation
trans_factor = np.random.normal(scale=translation_std, size=(1, 3))
gt_bboxes_3d[:, :3] += trans_factor
pts[:, :3] += trans_factor
data_dict.update({'gt_bboxes_3d': gt_bboxes_3d})
data_dict.update({'pts': pts})
return data_dict
def point_range_filter(data_dict, point_range):
'''
data_dict: dict(pts, gt_bboxes_3d, gt_labels, gt_names, difficulty)
point_range: [x1, y1, z1, x2, y2, z2]
'''
pts = data_dict['pts']
flag_x_low = pts[:, 0] > point_range[0]
flag_y_low = pts[:, 1] > point_range[1]
flag_z_low = pts[:, 2] > point_range[2]
flag_x_high = pts[:, 0] < point_range[3]
flag_y_high = pts[:, 1] < point_range[4]
flag_z_high = pts[:, 2] < point_range[5]
keep_mask = flag_x_low & flag_y_low & flag_z_low & flag_x_high & flag_y_high & flag_z_high
pts = pts[keep_mask]
data_dict.update({'pts': pts})
return data_dict
def object_range_filter(data_dict, object_range):
'''
data_dict: dict(pts, gt_bboxes_3d, gt_labels, gt_names, difficulty)
point_range: [x1, y1, z1, x2, y2, z2]
'''
gt_bboxes_3d, gt_labels = data_dict['gt_bboxes_3d'], data_dict['gt_labels']
gt_names, difficulty = data_dict['gt_names'], data_dict['difficulty']
# bev filter
flag_x_low = gt_bboxes_3d[:, 0] > object_range[0]
flag_y_low = gt_bboxes_3d[:, 1] > object_range[1]
flag_x_high = gt_bboxes_3d[:, 0] < object_range[3]
flag_y_high = gt_bboxes_3d[:, 1] < object_range[4]
keep_mask = flag_x_low & flag_y_low & flag_x_high & flag_y_high
gt_bboxes_3d, gt_labels = gt_bboxes_3d[keep_mask], gt_labels[keep_mask]
gt_names, difficulty = gt_names[keep_mask], difficulty[keep_mask]
gt_bboxes_3d[:, 6] = limit_period(gt_bboxes_3d[:, 6], 0.5, 2 * np.pi)
data_dict.update({'gt_bboxes_3d': gt_bboxes_3d})
data_dict.update({'gt_labels': gt_labels})
data_dict.update({'gt_names': gt_names})
data_dict.update({'difficulty': difficulty})
return data_dict
def points_shuffle(data_dict):
'''
data_dict: dict(pts, gt_bboxes_3d, gt_labels, gt_names, difficulty)
'''
pts = data_dict['pts']
indices = np.arange(0, len(pts))
np.random.shuffle(indices)
pts = pts[indices]
data_dict.update({'pts': pts})
return data_dict
def filter_bboxes_with_labels(data_dict, label=-1):
'''
data_dict: dict(pts, gt_bboxes_3d, gt_labels, gt_names, difficulty)
label: int
'''
gt_bboxes_3d, gt_labels = data_dict['gt_bboxes_3d'], data_dict['gt_labels']
gt_names, difficulty = data_dict['gt_names'], data_dict['difficulty']
idx = gt_labels != label
gt_bboxes_3d = gt_bboxes_3d[idx]
gt_labels = gt_labels[idx]
gt_names = gt_names[idx]
difficulty = difficulty[idx]
data_dict.update({'gt_bboxes_3d': gt_bboxes_3d})
data_dict.update({'gt_labels': gt_labels})
data_dict.update({'gt_names': gt_names})
data_dict.update({'difficulty': difficulty})
return data_dict
def data_augment(CLASSES, data_root, data_dict, data_aug_config):
'''
CLASSES: dict(Pedestrian=0, Cyclist=1, Car=2)
data_root: str, data root
data_dict: dict(pts, gt_bboxes_3d, gt_labels, gt_names, difficulty)
data_aug_config: dict()
return: data_dict
'''
# 1. sample databases and merge into the data
db_sampler_config = data_aug_config['db_sampler']
data_dict = dbsample(CLASSES,
data_root,
data_dict,
db_sampler=db_sampler_config['db_sampler'],
sample_groups=db_sampler_config['sample_groups'])
# 2. object noise
object_noise_config = data_aug_config['object_noise']
data_dict = object_noise(data_dict,
num_try=object_noise_config['num_try'],
translation_std=object_noise_config['translation_std'],
rot_range=object_noise_config['rot_range'])
# 3. random flip
random_flip_ratio = data_aug_config['random_flip_ratio']
data_dict = random_flip(data_dict, random_flip_ratio)
# 4. global rotation, scaling and translation
global_rot_scale_trans_config = data_aug_config['global_rot_scale_trans']
rot_range = global_rot_scale_trans_config['rot_range']
scale_ratio_range = global_rot_scale_trans_config['scale_ratio_range']
translation_std = global_rot_scale_trans_config['translation_std']
data_dict = global_rot_scale_trans(data_dict, rot_range, scale_ratio_range, translation_std)
# 5. points range filter
point_range = data_aug_config['point_range_filter']
data_dict = point_range_filter(data_dict, point_range)
# 6. object range filter
object_range = data_aug_config['object_range_filter']
data_dict = object_range_filter(data_dict, object_range)
# 7. points shuffle
data_dict = points_shuffle(data_dict)
# # 8. filter bboxes with label=-1
# data_dict = filter_bboxes_with_labels(data_dict)
return data_dict
import random
import numpy as np
import torch
from torch.utils.data import DataLoader
from functools import partial
def collate_fn(list_data):
batched_pts_list, batched_gt_bboxes_list = [], []
batched_labels_list, batched_names_list = [], []
batched_difficulty_list = []
batched_img_list, batched_calib_list = [], []
for data_dict in list_data:
pts, gt_bboxes_3d = data_dict['pts'], data_dict['gt_bboxes_3d']
gt_labels, gt_names = data_dict['gt_labels'], data_dict['gt_names']
difficulty = data_dict['difficulty']
image_info, calbi_info = data_dict['image_info'], data_dict['calib_info']
batched_pts_list.append(torch.from_numpy(pts))
batched_gt_bboxes_list.append(torch.from_numpy(gt_bboxes_3d))
batched_labels_list.append(torch.from_numpy(gt_labels))
batched_names_list.append(gt_names) # List(str)
batched_difficulty_list.append(torch.from_numpy(difficulty))
batched_img_list.append(image_info)
batched_calib_list.append(calbi_info)
rt_data_dict = dict(
batched_pts=batched_pts_list,
batched_gt_bboxes=batched_gt_bboxes_list,
batched_labels=batched_labels_list,
batched_names=batched_names_list,
batched_difficulty=batched_difficulty_list,
batched_img_info=batched_img_list,
batched_calib_info=batched_calib_list
)
return rt_data_dict
def get_dataloader(dataset, batch_size, num_workers, shuffle=True, drop_last=False):
collate = collate_fn
dataloader = DataLoader(
dataset=dataset,
batch_size=batch_size,
shuffle=shuffle,
num_workers=num_workers,
drop_last=drop_last,
collate_fn=collate,
)
return dataloader
import numpy as np
import os
import torch
from torch.utils.data import Dataset
import sys
BASE = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.dirname(BASE))
from pointpillars.utils import read_pickle, read_points, bbox_camera2lidar
from pointpillars.dataset import point_range_filter, data_augment
class BaseSampler():
def __init__(self, sampled_list, shuffle=True):
self.total_num = len(sampled_list)
self.sampled_list = np.array(sampled_list)
self.indices = np.arange(self.total_num)
if shuffle:
np.random.shuffle(self.indices)
self.shuffle = shuffle
self.idx = 0
def sample(self, num):
if self.idx + num < self.total_num:
ret = self.sampled_list[self.indices[self.idx:self.idx+num]]
self.idx += num
else:
ret = self.sampled_list[self.indices[self.idx:]]
self.idx = 0
if self.shuffle:
np.random.shuffle(self.indices)
return ret
class Kitti(Dataset):
CLASSES = {
'Pedestrian': 0,
'Cyclist': 1,
'Car': 2
}
def __init__(self, data_root, split, pts_prefix='velodyne_reduced'):
assert split in ['train', 'val', 'trainval', 'test']
self.data_root = data_root
self.split = split
self.pts_prefix = pts_prefix
self.data_infos = read_pickle(os.path.join(data_root, f'kitti_infos_{split}.pkl'))
self.sorted_ids = list(self.data_infos.keys())
db_infos = read_pickle(os.path.join(data_root, 'kitti_dbinfos_train.pkl'))
db_infos = self.filter_db(db_infos)
db_sampler = {}
for cat_name in self.CLASSES:
db_sampler[cat_name] = BaseSampler(db_infos[cat_name], shuffle=True)
self.data_aug_config=dict(
db_sampler=dict(
db_sampler=db_sampler,
sample_groups=dict(Car=15, Pedestrian=10, Cyclist=10)
),
object_noise=dict(
num_try=100,
translation_std=[0.25, 0.25, 0.25],
rot_range=[-0.15707963267, 0.15707963267]
),
random_flip_ratio=0.5,
global_rot_scale_trans=dict(
rot_range=[-0.78539816, 0.78539816],
scale_ratio_range=[0.95, 1.05],
translation_std=[0, 0, 0]
),
point_range_filter=[0, -39.68, -3, 69.12, 39.68, 1],
object_range_filter=[0, -39.68, -3, 69.12, 39.68, 1]
)
def remove_dont_care(self, annos_info):
keep_ids = [i for i, name in enumerate(annos_info['name']) if name != 'DontCare']
for k, v in annos_info.items():
annos_info[k] = v[keep_ids]
return annos_info
def filter_db(self, db_infos):
# 1. filter_by_difficulty
for k, v in db_infos.items():
db_infos[k] = [item for item in v if item['difficulty'] != -1]
# 2. filter_by_min_points, dict(Car=5, Pedestrian=10, Cyclist=10)
filter_thrs = dict(Car=5, Pedestrian=10, Cyclist=10)
for cat in self.CLASSES:
filter_thr = filter_thrs[cat]
db_infos[cat] = [item for item in db_infos[cat] if item['num_points_in_gt'] >= filter_thr]
return db_infos
def __getitem__(self, index):
data_info = self.data_infos[self.sorted_ids[index]]
image_info, calib_info, annos_info = \
data_info['image'], data_info['calib'], data_info['annos']
# point cloud input
velodyne_path = data_info['velodyne_path'].replace('velodyne', self.pts_prefix)
pts_path = os.path.join(self.data_root, velodyne_path)
pts = read_points(pts_path)
# calib input: for bbox coordinates transformation between Camera and Lidar.
# because
tr_velo_to_cam = calib_info['Tr_velo_to_cam'].astype(np.float32)
r0_rect = calib_info['R0_rect'].astype(np.float32)
# annotations input
annos_info = self.remove_dont_care(annos_info)
annos_name = annos_info['name']
annos_location = annos_info['location']
annos_dimension = annos_info['dimensions']
rotation_y = annos_info['rotation_y']
gt_bboxes = np.concatenate([annos_location, annos_dimension, rotation_y[:, None]], axis=1).astype(np.float32)
gt_bboxes_3d = bbox_camera2lidar(gt_bboxes, tr_velo_to_cam, r0_rect)
gt_labels = [self.CLASSES.get(name, -1) for name in annos_name]
data_dict = {
'pts': pts,
'gt_bboxes_3d': gt_bboxes_3d,
'gt_labels': np.array(gt_labels),
'gt_names': annos_name,
'difficulty': annos_info['difficulty'],
'image_info': image_info,
'calib_info': calib_info
}
if self.split in ['train', 'trainval']:
data_dict = data_augment(self.CLASSES, self.data_root, data_dict, self.data_aug_config)
else:
data_dict = point_range_filter(data_dict, point_range=self.data_aug_config['point_range_filter'])
return data_dict
def __len__(self):
return len(self.data_infos)
if __name__ == '__main__':
kitti_data = Kitti(data_root='/mnt/ssd1/lifa_rdata/det/kitti',
split='train')
kitti_data.__getitem__(9)
from .loss import Loss
\ No newline at end of file
import pdb
import torch
import torch.nn as nn
import torch.nn.functional as F
class Loss(nn.Module):
def __init__(self, alpha=0.25, gamma=2.0, beta=1/9, cls_w=1.0, reg_w=2.0, dir_w=0.2):
super().__init__()
self.alpha = 0.25
self.gamma = 2.0
self.cls_w = cls_w
self.reg_w = reg_w
self.dir_w = dir_w
self.smooth_l1_loss = nn.SmoothL1Loss(reduction='none',
beta=beta)
self.dir_cls = nn.CrossEntropyLoss()
def forward(self,
bbox_cls_pred,
bbox_pred,
bbox_dir_cls_pred,
batched_labels,
num_cls_pos,
batched_bbox_reg,
batched_dir_labels):
'''
bbox_cls_pred: (n, 3)
bbox_pred: (n, 7)
bbox_dir_cls_pred: (n, 2)
batched_labels: (n, )
num_cls_pos: int
batched_bbox_reg: (n, 7)
batched_dir_labels: (n, )
return: loss, float.
'''
# 1. bbox cls loss
# focal loss: FL = - \alpha_t (1 - p_t)^\gamma * log(p_t)
# y == 1 -> p_t = p
# y == 0 -> p_t = 1 - p
nclasses = bbox_cls_pred.size(1)
batched_labels = F.one_hot(batched_labels, nclasses + 1)[:, :nclasses].float() # (n, 3)
bbox_cls_pred_sigmoid = torch.sigmoid(bbox_cls_pred)
weights = self.alpha * (1 - bbox_cls_pred_sigmoid).pow(self.gamma) * batched_labels + \
(1 - self.alpha) * bbox_cls_pred_sigmoid.pow(self.gamma) * (1 - batched_labels) # (n, 3)
cls_loss = F.binary_cross_entropy(bbox_cls_pred_sigmoid, batched_labels, reduction='none')
cls_loss = cls_loss * weights
cls_loss = cls_loss.sum() / num_cls_pos
# 2. regression loss
reg_loss = self.smooth_l1_loss(bbox_pred, batched_bbox_reg)
reg_loss = reg_loss.sum() / reg_loss.size(0)
# 3. direction cls loss
dir_cls_loss = self.dir_cls(bbox_dir_cls_pred, batched_dir_labels)
# 4. total loss
total_loss = self.cls_w * cls_loss + self.reg_w * reg_loss + self.dir_w * dir_cls_loss
loss_dict={'cls_loss': cls_loss,
'reg_loss': reg_loss,
'dir_cls_loss': dir_cls_loss,
'total_loss': total_loss}
return loss_dict
\ No newline at end of file
from .anchors import Anchors, anchors2bboxes, bboxes2deltas
from .pointpillars import PointPillars, PillarLayer, PillarEncoder
import pdb
import numpy as np
import torch
from pointpillars.utils import limit_period, iou2d_nearest
class Anchors():
def __init__(self, ranges, sizes, rotations):
assert len(ranges) == len(sizes)
self.ranges = ranges
self.sizes = sizes
self.rotations = rotations
def get_anchors(self, feature_map_size, anchor_range, anchor_size, rotations):
'''
feature_map_size: (y_l, x_l)
anchor_range: [x1, y1, z1, x2, y2, z2]
anchor_size: [w, l, h]
rotations: [0, 1.57]
return: shape=(y_l, x_l, 2, 7)
'''
device = feature_map_size.device
x_centers = torch.linspace(anchor_range[0], anchor_range[3], feature_map_size[1] + 1, device=device)
y_centers = torch.linspace(anchor_range[1], anchor_range[4], feature_map_size[0] + 1, device=device)
z_centers = torch.linspace(anchor_range[2], anchor_range[5], 1 + 1, device=device)
x_shift = (x_centers[1] - x_centers[0]) / 2
y_shift = (y_centers[1] - y_centers[0]) / 2
z_shift = (z_centers[1] - z_centers[0]) / 2
x_centers = x_centers[:feature_map_size[1]] + x_shift # (feature_map_size[1], )
y_centers = y_centers[:feature_map_size[0]] + y_shift # (feature_map_size[0], )
z_centers = z_centers[:1] + z_shift # (1, )
# [feature_map_size[1], feature_map_size[0], 1, 2] * 4
meshgrids = torch.meshgrid(x_centers, y_centers, z_centers, rotations)
meshgrids = list(meshgrids)
for i in range(len(meshgrids)):
meshgrids[i] = meshgrids[i][..., None] # [feature_map_size[1], feature_map_size[0], 1, 2, 1]
anchor_size = anchor_size[None, None, None, None, :]
repeat_shape = [feature_map_size[1], feature_map_size[0], 1, len(rotations), 1]
anchor_size = anchor_size.repeat(repeat_shape) # [feature_map_size[1], feature_map_size[0], 1, 2, 3]
meshgrids.insert(3, anchor_size)
anchors = torch.cat(meshgrids, dim=-1).permute(2, 1, 0, 3, 4).contiguous() # [1, feature_map_size[0], feature_map_size[1], 2, 7]
return anchors.squeeze(0)
def get_multi_anchors(self, feature_map_size):
'''
feature_map_size: (y_l, x_l)
ranges: [[x1, y1, z1, x2, y2, z2], [x1, y1, z1, x2, y2, z2], [x1, y1, z1, x2, y2, z2]]
sizes: [[w, l, h], [w, l, h], [w, l, h]]
rotations: [0, 1.57]
return: shape=(y_l, x_l, 3, 2, 7)
'''
device = feature_map_size.device
ranges = torch.tensor(self.ranges, device=device)
sizes = torch.tensor(self.sizes, device=device)
rotations = torch.tensor(self.rotations, device=device)
multi_anchors = []
for i in range(len(ranges)):
anchors = self.get_anchors(feature_map_size=feature_map_size,
anchor_range=ranges[i],
anchor_size=sizes[i],
rotations=rotations)
multi_anchors.append(anchors[:, :, None, :, :])
multi_anchors = torch.cat(multi_anchors, dim=2)
return multi_anchors
def anchors2bboxes(anchors, deltas):
'''
anchors: (M, 7), (x, y, z, w, l, h, theta)
deltas: (M, 7)
return: (M, 7)
'''
da = torch.sqrt(anchors[:, 3] ** 2 + anchors[:, 4] ** 2)
x = deltas[:, 0] * da + anchors[:, 0]
y = deltas[:, 1] * da + anchors[:, 1]
z = deltas[:, 2] * anchors[:, 5] + anchors[:, 2] + anchors[:, 5] / 2
w = anchors[:, 3] * torch.exp(deltas[:, 3])
l = anchors[:, 4] * torch.exp(deltas[:, 4])
h = anchors[:, 5] * torch.exp(deltas[:, 5])
z = z - h / 2
theta = anchors[:, 6] + deltas[:, 6]
bboxes = torch.stack([x, y, z, w, l, h, theta], dim=1)
return bboxes
def bboxes2deltas(bboxes, anchors):
'''
bboxes: (M, 7), (x, y, z, w, l, h, theta)
anchors: (M, 7)
return: (M, 7)
'''
da = torch.sqrt(anchors[:, 3] ** 2 + anchors[:, 4] ** 2)
dx = (bboxes[:, 0] - anchors[:, 0]) / da
dy = (bboxes[:, 1] - anchors[:, 1]) / da
zb = bboxes[:, 2] + bboxes[:, 5] / 2 # bottom center
za = anchors[:, 2] + anchors[:, 5] / 2 # bottom center
dz = (zb - za) / anchors[:, 5] # bottom center
dw = torch.log(bboxes[:, 3] / anchors[:, 3])
dl = torch.log(bboxes[:, 4] / anchors[:, 4])
dh = torch.log(bboxes[:, 5] / anchors[:, 5])
dtheta = bboxes[:, 6] - anchors[:, 6]
deltas = torch.stack([dx, dy, dz, dw, dl, dh, dtheta], dim=1)
return deltas
def anchor_target(batched_anchors, batched_gt_bboxes, batched_gt_labels, assigners, nclasses):
'''
batched_anchors: [(y_l, x_l, 3, 2, 7), (y_l, x_l, 3, 2, 7), ... ]
batched_gt_bboxes: [(n1, 7), (n2, 7), ...]
batched_gt_labels: [(n1, ), (n2, ), ...]
return:
dict = {batched_anchors_labels: (bs, n_anchors),
batched_labels_weights: (bs, n_anchors),
batched_anchors_reg: (bs, n_anchors, 7),
batched_reg_weights: (bs, n_anchors),
batched_anchors_dir: (bs, n_anchors),
batched_dir_weights: (bs, n_anchors)}
'''
assert len(batched_anchors) == len(batched_gt_bboxes) == len(batched_gt_labels)
batch_size = len(batched_anchors)
n_assigners = len(assigners)
batched_labels, batched_label_weights = [], []
batched_bbox_reg, batched_bbox_reg_weights = [], []
batched_dir_labels, batched_dir_labels_weights = [], []
for i in range(batch_size):
anchors = batched_anchors[i]
gt_bboxes, gt_labels = batched_gt_bboxes[i], batched_gt_labels[i]
# what we want to get next ?
# 1. identify positive anchors and negative anchors -> cls
# 2. identify the regresstion values -> reg
# 3. indentify the direction -> dir_cls
multi_labels, multi_label_weights = [], []
multi_bbox_reg, multi_bbox_reg_weights = [], []
multi_dir_labels, multi_dir_labels_weights = [], []
d1, d2, d3, d4, d5 = anchors.size()
for j in range(n_assigners): # multi anchors
assigner = assigners[j]
pos_iou_thr, neg_iou_thr, min_iou_thr = \
assigner['pos_iou_thr'], assigner['neg_iou_thr'], assigner['min_iou_thr']
cur_anchors = anchors[:, :, j, :, :].reshape(-1, 7)
overlaps = iou2d_nearest(gt_bboxes, cur_anchors)
max_overlaps, max_overlaps_idx = torch.max(overlaps, dim=0)
gt_max_overlaps, _ = torch.max(overlaps, dim=1)
assigned_gt_inds = -torch.ones_like(cur_anchors[:, 0], dtype=torch.long)
# a. negative anchors
assigned_gt_inds[max_overlaps < neg_iou_thr] = 0
# b. positive anchors
# rule 1
assigned_gt_inds[max_overlaps >= pos_iou_thr] = max_overlaps_idx[max_overlaps >= pos_iou_thr] + 1
# rule 2
# support one bbox to multi anchors, only if the anchors are with the highest iou.
# rule2 may modify the labels generated by rule 1
for i in range(len(gt_bboxes)):
if gt_max_overlaps[i] >= min_iou_thr:
assigned_gt_inds[overlaps[i] == gt_max_overlaps[i]] = i + 1
pos_flag = assigned_gt_inds > 0
neg_flag = assigned_gt_inds == 0
# 1. anchor labels
assigned_gt_labels = torch.zeros_like(cur_anchors[:, 0], dtype=torch.long) + nclasses # -1 is not optimal, for some bboxes are with labels -1
assigned_gt_labels[pos_flag] = gt_labels[assigned_gt_inds[pos_flag] - 1].long()
assigned_gt_labels_weights = torch.zeros_like(cur_anchors[:, 0])
assigned_gt_labels_weights[pos_flag] = 1
assigned_gt_labels_weights[neg_flag] = 1
# 2. anchor regression
assigned_gt_reg_weights = torch.zeros_like(cur_anchors[:, 0])
assigned_gt_reg_weights[pos_flag] = 1
assigned_gt_reg = torch.zeros_like(cur_anchors)
positive_anchors = cur_anchors[pos_flag]
corr_gt_bboxes = gt_bboxes[assigned_gt_inds[pos_flag] - 1]
assigned_gt_reg[pos_flag] = bboxes2deltas(corr_gt_bboxes, positive_anchors)
# 3. anchor direction
assigned_gt_dir_weights = torch.zeros_like(cur_anchors[:, 0])
assigned_gt_dir_weights[pos_flag] = 1
assigned_gt_dir = torch.zeros_like(cur_anchors[:, 0], dtype=torch.long)
dir_cls_targets = limit_period(corr_gt_bboxes[:, 6].cpu(), 0, 2 * np.pi).to(corr_gt_bboxes)
dir_cls_targets = torch.floor(dir_cls_targets / np.pi).long()
assigned_gt_dir[pos_flag] = torch.clamp(dir_cls_targets, min=0, max=1)
multi_labels.append(assigned_gt_labels.reshape(d1, d2, 1, d4))
multi_label_weights.append(assigned_gt_labels_weights.reshape(d1, d2, 1, d4))
multi_bbox_reg.append(assigned_gt_reg.reshape(d1, d2, 1, d4, -1))
multi_bbox_reg_weights.append(assigned_gt_reg_weights.reshape(d1, d2, 1, d4))
multi_dir_labels.append(assigned_gt_dir.reshape(d1, d2, 1, d4))
multi_dir_labels_weights.append(assigned_gt_dir_weights.reshape(d1, d2, 1, d4))
multi_labels = torch.cat(multi_labels, dim=-2).reshape(-1)
multi_label_weights = torch.cat(multi_label_weights, dim=-2).reshape(-1)
multi_bbox_reg = torch.cat(multi_bbox_reg, dim=-3).reshape(-1, d5)
multi_bbox_reg_weights = torch.cat(multi_bbox_reg_weights, dim=-2).reshape(-1)
multi_dir_labels = torch.cat(multi_dir_labels, dim=-2).reshape(-1)
multi_dir_labels_weights = torch.cat(multi_dir_labels_weights, dim=-2).reshape(-1)
batched_labels.append(multi_labels)
batched_label_weights.append(multi_label_weights)
batched_bbox_reg.append(multi_bbox_reg)
batched_bbox_reg_weights.append(multi_bbox_reg_weights)
batched_dir_labels.append(multi_dir_labels)
batched_dir_labels_weights.append(multi_dir_labels_weights)
rt_dict = dict(
batched_labels=torch.stack(batched_labels, 0), # (bs, y_l * x_l * 3 * 2)
batched_label_weights=torch.stack(batched_label_weights, 0), # (bs, y_l * x_l * 3 * 2)
batched_bbox_reg=torch.stack(batched_bbox_reg, 0), # (bs, y_l * x_l * 3 * 2, 7)
batched_bbox_reg_weights=torch.stack(batched_bbox_reg_weights, 0), # (bs, y_l * x_l * 3 * 2)
batched_dir_labels=torch.stack(batched_dir_labels, 0), # (bs, y_l * x_l * 3 * 2)
batched_dir_labels_weights=torch.stack(batched_dir_labels_weights, 0) # (bs, y_l * x_l * 3 * 2)
)
return rt_dict
\ No newline at end of file
import numpy as np
import pdb
import torch
import torch.nn as nn
import torch.nn.functional as F
from pointpillars.model.anchors import Anchors, anchor_target, anchors2bboxes
from pointpillars.ops import Voxelization, nms_cuda
from pointpillars.utils import limit_period
class PillarLayer(nn.Module):
def __init__(self, voxel_size, point_cloud_range, max_num_points, max_voxels):
super().__init__()
self.voxel_layer = Voxelization(voxel_size=voxel_size,
point_cloud_range=point_cloud_range,
max_num_points=max_num_points,
max_voxels=max_voxels)
@torch.no_grad()
def forward(self, batched_pts):
'''
batched_pts: list[tensor], len(batched_pts) = bs
return:
pillars: (p1 + p2 + ... + pb, num_points, c),
coors_batch: (p1 + p2 + ... + pb, 1 + 3),
num_points_per_pillar: (p1 + p2 + ... + pb, ), (b: batch size)
'''
pillars, coors, npoints_per_pillar = [], [], []
for i, pts in enumerate(batched_pts):
voxels_out, coors_out, num_points_per_voxel_out = self.voxel_layer(pts)
# voxels_out: (max_voxel, num_points, c), coors_out: (max_voxel, 3)
# num_points_per_voxel_out: (max_voxel, )
pillars.append(voxels_out)
coors.append(coors_out.long())
npoints_per_pillar.append(num_points_per_voxel_out)
pillars = torch.cat(pillars, dim=0) # (p1 + p2 + ... + pb, num_points, c)
npoints_per_pillar = torch.cat(npoints_per_pillar, dim=0) # (p1 + p2 + ... + pb, )
coors_batch = []
for i, cur_coors in enumerate(coors):
coors_batch.append(F.pad(cur_coors, (1, 0), value=i))
coors_batch = torch.cat(coors_batch, dim=0) # (p1 + p2 + ... + pb, 1 + 3)
return pillars, coors_batch, npoints_per_pillar
class PillarEncoder(nn.Module):
def __init__(self, voxel_size, point_cloud_range, in_channel, out_channel):
super().__init__()
self.out_channel = out_channel
self.vx, self.vy = voxel_size[0], voxel_size[1]
self.x_offset = voxel_size[0] / 2 + point_cloud_range[0]
self.y_offset = voxel_size[1] / 2 + point_cloud_range[1]
self.x_l = int((point_cloud_range[3] - point_cloud_range[0]) / voxel_size[0])
self.y_l = int((point_cloud_range[4] - point_cloud_range[1]) / voxel_size[1])
self.conv = nn.Conv1d(in_channel, out_channel, 1, bias=False)
self.bn = nn.BatchNorm1d(out_channel, eps=1e-3, momentum=0.01)
def forward(self, pillars, coors_batch, npoints_per_pillar):
'''
pillars: (p1 + p2 + ... + pb, num_points, c), c = 4
coors_batch: (p1 + p2 + ... + pb, 1 + 3)
npoints_per_pillar: (p1 + p2 + ... + pb, )
return: (bs, out_channel, y_l, x_l)
'''
device = pillars.device
# 1. calculate offset to the points center (in each pillar)
offset_pt_center = pillars[:, :, :3] - torch.sum(pillars[:, :, :3], dim=1, keepdim=True) / npoints_per_pillar[:, None, None] # (p1 + p2 + ... + pb, num_points, 3)
# 2. calculate offset to the pillar center
x_offset_pi_center = pillars[:, :, :1] - (coors_batch[:, None, 1:2] * self.vx + self.x_offset) # (p1 + p2 + ... + pb, num_points, 1)
y_offset_pi_center = pillars[:, :, 1:2] - (coors_batch[:, None, 2:3] * self.vy + self.y_offset) # (p1 + p2 + ... + pb, num_points, 1)
# 3. encoder
features = torch.cat([pillars, offset_pt_center, x_offset_pi_center, y_offset_pi_center], dim=-1) # (p1 + p2 + ... + pb, num_points, 9)
features[:, :, 0:1] = x_offset_pi_center # tmp
features[:, :, 1:2] = y_offset_pi_center # tmp
# In consitent with mmdet3d.
# The reason can be referenced to https://github.com/open-mmlab/mmdetection3d/issues/1150
# 4. find mask for (0, 0, 0) and update the encoded features
# a very beautiful implementation
voxel_ids = torch.arange(0, pillars.size(1)).to(device) # (num_points, )
mask = voxel_ids[:, None] < npoints_per_pillar[None, :] # (num_points, p1 + p2 + ... + pb)
mask = mask.permute(1, 0).contiguous() # (p1 + p2 + ... + pb, num_points)
features *= mask[:, :, None]
# 5. embedding
features = features.permute(0, 2, 1).contiguous() # (p1 + p2 + ... + pb, 9, num_points)
features = F.relu(self.bn(self.conv(features))) # (p1 + p2 + ... + pb, out_channels, num_points)
pooling_features = torch.max(features, dim=-1)[0] # (p1 + p2 + ... + pb, out_channels)
# 6. pillar scatter
batched_canvas = []
bs = coors_batch[-1, 0] + 1
for i in range(bs):
cur_coors_idx = coors_batch[:, 0] == i
cur_coors = coors_batch[cur_coors_idx, :]
cur_features = pooling_features[cur_coors_idx]
canvas = torch.zeros((self.x_l, self.y_l, self.out_channel), dtype=torch.float32, device=device)
canvas[cur_coors[:, 1], cur_coors[:, 2]] = cur_features
canvas = canvas.permute(2, 1, 0).contiguous()
batched_canvas.append(canvas)
batched_canvas = torch.stack(batched_canvas, dim=0) # (bs, in_channel, self.y_l, self.x_l)
return batched_canvas
class Backbone(nn.Module):
def __init__(self, in_channel, out_channels, layer_nums, layer_strides=[2, 2, 2]):
super().__init__()
assert len(out_channels) == len(layer_nums)
assert len(out_channels) == len(layer_strides)
self.multi_blocks = nn.ModuleList()
for i in range(len(layer_strides)):
blocks = []
blocks.append(nn.Conv2d(in_channel, out_channels[i], 3, stride=layer_strides[i], bias=False, padding=1))
blocks.append(nn.BatchNorm2d(out_channels[i], eps=1e-3, momentum=0.01))
blocks.append(nn.ReLU(inplace=True))
for _ in range(layer_nums[i]):
blocks.append(nn.Conv2d(out_channels[i], out_channels[i], 3, bias=False, padding=1))
blocks.append(nn.BatchNorm2d(out_channels[i], eps=1e-3, momentum=0.01))
blocks.append(nn.ReLU(inplace=True))
in_channel = out_channels[i]
self.multi_blocks.append(nn.Sequential(*blocks))
# in consitent with mmdet3d
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
def forward(self, x):
'''
x: (b, c, y_l, x_l). Default: (6, 64, 496, 432)
return: list[]. Default: [(6, 64, 248, 216), (6, 128, 124, 108), (6, 256, 62, 54)]
'''
outs = []
for i in range(len(self.multi_blocks)):
x = self.multi_blocks[i](x)
outs.append(x)
return outs
class Neck(nn.Module):
def __init__(self, in_channels, upsample_strides, out_channels):
super().__init__()
assert len(in_channels) == len(upsample_strides)
assert len(upsample_strides) == len(out_channels)
self.decoder_blocks = nn.ModuleList()
for i in range(len(in_channels)):
decoder_block = []
decoder_block.append(nn.ConvTranspose2d(in_channels[i],
out_channels[i],
upsample_strides[i],
stride=upsample_strides[i],
bias=False))
decoder_block.append(nn.BatchNorm2d(out_channels[i], eps=1e-3, momentum=0.01))
decoder_block.append(nn.ReLU(inplace=True))
self.decoder_blocks.append(nn.Sequential(*decoder_block))
# in consitent with mmdet3d
for m in self.modules():
if isinstance(m, nn.ConvTranspose2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
def forward(self, x):
'''
x: [(bs, 64, 248, 216), (bs, 128, 124, 108), (bs, 256, 62, 54)]
return: (bs, 384, 248, 216)
'''
outs = []
for i in range(len(self.decoder_blocks)):
xi = self.decoder_blocks[i](x[i]) # (bs, 128, 248, 216)
outs.append(xi)
out = torch.cat(outs, dim=1)
return out
class Head(nn.Module):
def __init__(self, in_channel, n_anchors, n_classes):
super().__init__()
self.conv_cls = nn.Conv2d(in_channel, n_anchors*n_classes, 1)
self.conv_reg = nn.Conv2d(in_channel, n_anchors*7, 1)
self.conv_dir_cls = nn.Conv2d(in_channel, n_anchors*2, 1)
# in consitent with mmdet3d
conv_layer_id = 0
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.normal_(m.weight, mean=0, std=0.01)
if conv_layer_id == 0:
prior_prob = 0.01
bias_init = float(-np.log((1 - prior_prob) / prior_prob))
nn.init.constant_(m.bias, bias_init)
else:
nn.init.constant_(m.bias, 0)
conv_layer_id += 1
def forward(self, x):
'''
x: (bs, 384, 248, 216)
return:
bbox_cls_pred: (bs, n_anchors*3, 248, 216)
bbox_pred: (bs, n_anchors*7, 248, 216)
bbox_dir_cls_pred: (bs, n_anchors*2, 248, 216)
'''
bbox_cls_pred = self.conv_cls(x)
bbox_pred = self.conv_reg(x)
bbox_dir_cls_pred = self.conv_dir_cls(x)
return bbox_cls_pred, bbox_pred, bbox_dir_cls_pred
class PointPillars(nn.Module):
def __init__(self,
nclasses=3,
voxel_size=[0.16, 0.16, 4],
point_cloud_range=[0, -39.68, -3, 69.12, 39.68, 1],
max_num_points=32,
max_voxels=(16000, 40000)):
super().__init__()
self.nclasses = nclasses
self.pillar_layer = PillarLayer(voxel_size=voxel_size,
point_cloud_range=point_cloud_range,
max_num_points=max_num_points,
max_voxels=max_voxels)
self.pillar_encoder = PillarEncoder(voxel_size=voxel_size,
point_cloud_range=point_cloud_range,
in_channel=9,
out_channel=64)
self.backbone = Backbone(in_channel=64,
out_channels=[64, 128, 256],
layer_nums=[3, 5, 5])
self.neck = Neck(in_channels=[64, 128, 256],
upsample_strides=[1, 2, 4],
out_channels=[128, 128, 128])
self.head = Head(in_channel=384, n_anchors=2*nclasses, n_classes=nclasses)
# anchors
ranges = [[0, -39.68, -0.6, 69.12, 39.68, -0.6],
[0, -39.68, -0.6, 69.12, 39.68, -0.6],
[0, -39.68, -1.78, 69.12, 39.68, -1.78]]
sizes = [[0.6, 0.8, 1.73], [0.6, 1.76, 1.73], [1.6, 3.9, 1.56]]
rotations=[0, 1.57]
self.anchors_generator = Anchors(ranges=ranges,
sizes=sizes,
rotations=rotations)
# train
self.assigners = [
{'pos_iou_thr': 0.5, 'neg_iou_thr': 0.35, 'min_iou_thr': 0.35},
{'pos_iou_thr': 0.5, 'neg_iou_thr': 0.35, 'min_iou_thr': 0.35},
{'pos_iou_thr': 0.6, 'neg_iou_thr': 0.45, 'min_iou_thr': 0.45},
]
# val and test
self.nms_pre = 100
self.nms_thr = 0.01
self.score_thr = 0.1
self.max_num = 50
def get_predicted_bboxes_single(self, bbox_cls_pred, bbox_pred, bbox_dir_cls_pred, anchors):
'''
bbox_cls_pred: (n_anchors*3, 248, 216)
bbox_pred: (n_anchors*7, 248, 216)
bbox_dir_cls_pred: (n_anchors*2, 248, 216)
anchors: (y_l, x_l, 3, 2, 7)
return:
bboxes: (k, 7)
labels: (k, )
scores: (k, )
'''
# 0. pre-process
bbox_cls_pred = bbox_cls_pred.permute(1, 2, 0).reshape(-1, self.nclasses)
bbox_pred = bbox_pred.permute(1, 2, 0).reshape(-1, 7)
bbox_dir_cls_pred = bbox_dir_cls_pred.permute(1, 2, 0).reshape(-1, 2)
anchors = anchors.reshape(-1, 7)
bbox_cls_pred = torch.sigmoid(bbox_cls_pred)
bbox_dir_cls_pred = torch.max(bbox_dir_cls_pred, dim=1)[1]
# 1. obtain self.nms_pre bboxes based on scores
inds = bbox_cls_pred.max(1)[0].topk(self.nms_pre)[1]
bbox_cls_pred = bbox_cls_pred[inds]
bbox_pred = bbox_pred[inds]
bbox_dir_cls_pred = bbox_dir_cls_pred[inds]
anchors = anchors[inds]
# 2. decode predicted offsets to bboxes
bbox_pred = anchors2bboxes(anchors, bbox_pred)
# 3. nms
bbox_pred2d_xy = bbox_pred[:, [0, 1]]
bbox_pred2d_lw = bbox_pred[:, [3, 4]]
bbox_pred2d = torch.cat([bbox_pred2d_xy - bbox_pred2d_lw / 2,
bbox_pred2d_xy + bbox_pred2d_lw / 2,
bbox_pred[:, 6:]], dim=-1) # (n_anchors, 5)
ret_bboxes, ret_labels, ret_scores = [], [], []
for i in range(self.nclasses):
# 3.1 filter bboxes with scores below self.score_thr
cur_bbox_cls_pred = bbox_cls_pred[:, i]
score_inds = cur_bbox_cls_pred > self.score_thr
if score_inds.sum() == 0:
continue
cur_bbox_cls_pred = cur_bbox_cls_pred[score_inds]
cur_bbox_pred2d = bbox_pred2d[score_inds]
cur_bbox_pred = bbox_pred[score_inds]
cur_bbox_dir_cls_pred = bbox_dir_cls_pred[score_inds]
# 3.2 nms core
keep_inds = nms_cuda(boxes=cur_bbox_pred2d,
scores=cur_bbox_cls_pred,
thresh=self.nms_thr,
pre_maxsize=None,
post_max_size=None)
cur_bbox_cls_pred = cur_bbox_cls_pred[keep_inds]
cur_bbox_pred = cur_bbox_pred[keep_inds]
cur_bbox_dir_cls_pred = cur_bbox_dir_cls_pred[keep_inds]
cur_bbox_pred[:, -1] = limit_period(cur_bbox_pred[:, -1].detach().cpu(), 1, np.pi).to(cur_bbox_pred) # [-pi, 0]
cur_bbox_pred[:, -1] += (1 - cur_bbox_dir_cls_pred) * np.pi
ret_bboxes.append(cur_bbox_pred)
ret_labels.append(torch.zeros_like(cur_bbox_pred[:, 0], dtype=torch.long) + i)
ret_scores.append(cur_bbox_cls_pred)
# 4. filter some bboxes if bboxes number is above self.max_num
if len(ret_bboxes) == 0:
return [], [], []
ret_bboxes = torch.cat(ret_bboxes, 0)
ret_labels = torch.cat(ret_labels, 0)
ret_scores = torch.cat(ret_scores, 0)
if ret_bboxes.size(0) > self.max_num:
final_inds = ret_scores.topk(self.max_num)[1]
ret_bboxes = ret_bboxes[final_inds]
ret_labels = ret_labels[final_inds]
ret_scores = ret_scores[final_inds]
result = {
'lidar_bboxes': ret_bboxes.detach().cpu().numpy(),
'labels': ret_labels.detach().cpu().numpy(),
'scores': ret_scores.detach().cpu().numpy()
}
return result
def get_predicted_bboxes(self, bbox_cls_pred, bbox_pred, bbox_dir_cls_pred, batched_anchors):
'''
bbox_cls_pred: (bs, n_anchors*3, 248, 216)
bbox_pred: (bs, n_anchors*7, 248, 216)
bbox_dir_cls_pred: (bs, n_anchors*2, 248, 216)
batched_anchors: (bs, y_l, x_l, 3, 2, 7)
return:
bboxes: [(k1, 7), (k2, 7), ... ]
labels: [(k1, ), (k2, ), ... ]
scores: [(k1, ), (k2, ), ... ]
'''
results = []
bs = bbox_cls_pred.size(0)
for i in range(bs):
result = self.get_predicted_bboxes_single(bbox_cls_pred=bbox_cls_pred[i],
bbox_pred=bbox_pred[i],
bbox_dir_cls_pred=bbox_dir_cls_pred[i],
anchors=batched_anchors[i])
results.append(result)
return results
def forward(self, batched_pts, mode='test', batched_gt_bboxes=None, batched_gt_labels=None):
batch_size = len(batched_pts)
# batched_pts: list[tensor] -> pillars: (p1 + p2 + ... + pb, num_points, c),
# coors_batch: (p1 + p2 + ... + pb, 1 + 3),
# num_points_per_pillar: (p1 + p2 + ... + pb, ), (b: batch size)
pillars, coors_batch, npoints_per_pillar = self.pillar_layer(batched_pts)
# pillars: (p1 + p2 + ... + pb, num_points, c), c = 4
# coors_batch: (p1 + p2 + ... + pb, 1 + 3)
# npoints_per_pillar: (p1 + p2 + ... + pb, )
# -> pillar_features: (bs, out_channel, y_l, x_l)
pillar_features = self.pillar_encoder(pillars, coors_batch, npoints_per_pillar)
# xs: [(bs, 64, 248, 216), (bs, 128, 124, 108), (bs, 256, 62, 54)]
xs = self.backbone(pillar_features)
# x: (bs, 384, 248, 216)
x = self.neck(xs)
# bbox_cls_pred: (bs, n_anchors*3, 248, 216)
# bbox_pred: (bs, n_anchors*7, 248, 216)
# bbox_dir_cls_pred: (bs, n_anchors*2, 248, 216)
bbox_cls_pred, bbox_pred, bbox_dir_cls_pred = self.head(x)
# anchors
device = bbox_cls_pred.device
feature_map_size = torch.tensor(list(bbox_cls_pred.size()[-2:]), device=device)
anchors = self.anchors_generator.get_multi_anchors(feature_map_size)
batched_anchors = [anchors for _ in range(batch_size)]
if mode == 'train':
anchor_target_dict = anchor_target(batched_anchors=batched_anchors,
batched_gt_bboxes=batched_gt_bboxes,
batched_gt_labels=batched_gt_labels,
assigners=self.assigners,
nclasses=self.nclasses)
return bbox_cls_pred, bbox_pred, bbox_dir_cls_pred, anchor_target_dict
elif mode == 'val':
results = self.get_predicted_bboxes(bbox_cls_pred=bbox_cls_pred,
bbox_pred=bbox_pred,
bbox_dir_cls_pred=bbox_dir_cls_pred,
batched_anchors=batched_anchors)
return results
elif mode == 'test':
results = self.get_predicted_bboxes(bbox_cls_pred=bbox_cls_pred,
bbox_pred=bbox_pred,
bbox_dir_cls_pred=bbox_dir_cls_pred,
batched_anchors=batched_anchors)
return results
else:
raise ValueError
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment