Commit d1aac35d authored by zhangwenwei's avatar zhangwenwei
Browse files

Initial commit

parents
from mmdet3d.utils import Registry
OBJECTSAMPLERS = Registry('object_sampler')
from collections import Sequence
import mmcv
import numpy as np
import torch
def remove_dontcare(image_anno):
img_filtered_annotations = {}
relevant_annotation_indices = [
i for i, x in enumerate(image_anno['name']) if x != 'DontCare'
]
for key in image_anno.keys():
img_filtered_annotations[key] = (
image_anno[key][relevant_annotation_indices])
return img_filtered_annotations
def to_tensor(data):
# TODO: remove this duplicated method in the future
"""Convert objects of various python types to :obj:`torch.Tensor`.
Supported types are: :class:`numpy.ndarray`, :class:`torch.Tensor`,
:class:`Sequence`, :class:`int` and :class:`float`.
"""
if isinstance(data, torch.Tensor):
return data
elif isinstance(data, np.ndarray):
return torch.from_numpy(data)
elif isinstance(data, Sequence) and not mmcv.is_str(data):
return torch.tensor(data)
elif isinstance(data, int):
return torch.LongTensor([data])
elif isinstance(data, float):
return torch.FloatTensor([data])
else:
raise TypeError('type {} cannot be converted to tensor.'.format(
type(data)))
from .anchor_heads import * # noqa: F401,F403
from .backbones import * # noqa: F401,F403
from .bbox_heads import * # noqa: F401,F403
from .builder import (build_backbone, build_detector, build_head, build_loss,
build_neck, build_roi_extractor, build_shared_head)
from .detectors import * # noqa: F401,F403
from .fusion_layers import * # noqa: F401,F403
from .losses import * # noqa: F401,F403
from .middle_encoders import * # noqa: F401,F403
from .necks import * # noqa: F401,F403
from .registry import (BACKBONES, DETECTORS, HEADS, LOSSES, MIDDLE_ENCODERS,
NECKS, ROI_EXTRACTORS, SHARED_HEADS, VOXEL_ENCODERS)
from .roi_extractors import * # noqa: F401,F403
from .voxel_encoders import * # noqa: F401,F403
__all__ = [
'BACKBONES', 'NECKS', 'ROI_EXTRACTORS', 'SHARED_HEADS', 'HEADS', 'LOSSES',
'VOXEL_ENCODERS', 'MIDDLE_ENCODERS', 'DETECTORS', 'build_backbone',
'build_neck', 'build_roi_extractor', 'build_shared_head', 'build_head',
'build_loss', 'build_detector'
]
from .boxvelo_head import Anchor3DVeloHead
from .second_head import SECONDHead
__all__ = ['Anchor3DVeloHead', 'SECONDHead']
import numpy as np
import torch
from mmcv.cnn import normal_init
from mmdet3d.core import box_torch_ops, boxes3d_to_bev_torch_lidar
from mmdet3d.ops.iou3d.iou3d_utils import nms_gpu, nms_normal_gpu
from ..registry import HEADS
from ..utils import bias_init_with_prob
from .second_head import SECONDHead
@HEADS.register_module
class Anchor3DVeloHead(SECONDHead):
"""Anchor-based head for 3D anchor with velocity
Args:
in_channels (int): Number of channels in the input feature map.
feat_channels (int): Number of channels of the feature map.
anchor_scales (Iterable): Anchor scales.
anchor_ratios (Iterable): Anchor aspect ratios.
anchor_strides (Iterable): Anchor strides.
anchor_base_sizes (Iterable): Anchor base sizes.
target_means (Iterable): Mean values of regression targets.
target_stds (Iterable): Std values of regression targets.
loss_cls (dict): Config of classification loss.
loss_bbox (dict): Config of localization loss.
""" # noqa: W605
def __init__(self,
class_names,
num_classes,
in_channels,
train_cfg,
test_cfg,
cache_anchor=False,
feat_channels=256,
use_direction_classifier=True,
encode_bg_as_zeros=False,
box_code_size=9,
anchor_generator=dict(type='AnchorGeneratorRange', ),
anchor_range=[0, -39.68, -1.78, 69.12, 39.68, -1.78],
anchor_strides=[2],
anchor_sizes=[[1.6, 3.9, 1.56]],
anchor_rotations=[0, 1.57],
anchor_custom_values=[0, 0],
assigner_per_size=False,
assign_per_class=False,
diff_rad_by_sin=True,
dir_offset=0,
dir_limit_offset=1,
target_means=(.0, .0, .0, .0),
target_stds=(1.0, 1.0, 1.0, 1.0),
bbox_coder=dict(type='ResidualCoder', ),
loss_cls=dict(
type='CrossEntropyLoss',
use_sigmoid=True,
loss_weight=1.0),
loss_bbox=dict(
type='SmoothL1Loss', beta=1.0 / 9.0, loss_weight=2.0),
loss_dir=dict(type='CrossEntropyLoss', loss_weight=0.2)):
super().__init__(class_names, in_channels, train_cfg, test_cfg,
cache_anchor, feat_channels, use_direction_classifier,
encode_bg_as_zeros, box_code_size, anchor_generator,
anchor_range, anchor_strides, anchor_sizes,
anchor_rotations, anchor_custom_values,
assigner_per_size, assign_per_class, diff_rad_by_sin,
dir_offset, dir_limit_offset, target_means,
target_stds, bbox_coder, loss_cls, loss_bbox,
loss_dir)
self.num_classes = num_classes
# build head layers & losses
if not self.use_sigmoid_cls:
self.num_classes += 1
self._init_layers()
def init_weights(self):
# pass
# use the initialization when ready
bias_cls = bias_init_with_prob(0.01)
normal_init(self.conv_cls, std=0.01, bias=bias_cls)
normal_init(self.conv_reg, std=0.01)
@staticmethod
def add_sin_difference(boxes1, boxes2):
# Caution: the 7th dim is the rotation, (last dim without velo)
rad_pred_encoding = torch.sin(boxes1[..., 6:7]) * torch.cos(
boxes2[..., 6:7])
rad_tg_encoding = torch.cos(boxes1[..., 6:7]) * torch.sin(boxes2[...,
6:7])
boxes1 = torch.cat(
[boxes1[..., :6], rad_pred_encoding, boxes1[..., 7:]], dim=-1)
boxes2 = torch.cat([boxes2[..., :6], rad_tg_encoding, boxes2[..., 7:]],
dim=-1)
return boxes1, boxes2
def get_bboxes_single(self,
cls_scores,
bbox_preds,
dir_cls_preds,
mlvl_anchors,
input_meta,
rescale=False):
assert len(cls_scores) == len(bbox_preds) == len(mlvl_anchors)
mlvl_bboxes = []
mlvl_scores = []
mlvl_dir_scores = []
for cls_score, bbox_pred, dir_cls_pred, anchors in zip(
cls_scores, bbox_preds, dir_cls_preds, mlvl_anchors):
assert cls_score.size()[-2:] == bbox_pred.size()[-2:]
assert cls_score.size()[-2:] == dir_cls_pred.size()[-2:]
dir_cls_pred = dir_cls_pred.permute(1, 2, 0).reshape(-1, 2)
dir_cls_score = torch.max(dir_cls_pred, dim=-1)[1]
cls_score = cls_score.permute(1, 2,
0).reshape(-1, self.num_classes)
if self.use_sigmoid_cls:
scores = cls_score.sigmoid()
else:
scores = cls_score.softmax(-1)
bbox_pred = bbox_pred.permute(1, 2,
0).reshape(-1, self.box_code_size)
nms_pre = self.test_cfg.get('nms_pre', -1)
if nms_pre > 0 and scores.shape[0] > nms_pre:
if self.use_sigmoid_cls:
max_scores, _ = scores.max(dim=1)
else:
max_scores, _ = scores[:, :-1].max(dim=1)
_, topk_inds = max_scores.topk(nms_pre)
anchors = anchors[topk_inds, :]
bbox_pred = bbox_pred[topk_inds, :]
scores = scores[topk_inds, :]
dir_cls_score = dir_cls_score[topk_inds]
bboxes = self.bbox_coder.decode_torch(anchors, bbox_pred,
self.target_means,
self.target_stds)
mlvl_bboxes.append(bboxes)
mlvl_scores.append(scores)
mlvl_dir_scores.append(dir_cls_score)
mlvl_bboxes = torch.cat(mlvl_bboxes)
mlvl_bboxes_for_nms = boxes3d_to_bev_torch_lidar(mlvl_bboxes)
mlvl_scores = torch.cat(mlvl_scores)
mlvl_dir_scores = torch.cat(mlvl_dir_scores)
if self.use_sigmoid_cls:
# Add a dummy background class to the front when using sigmoid
padding = mlvl_scores.new_zeros(mlvl_scores.shape[0], 1)
mlvl_scores = torch.cat([mlvl_scores, padding], dim=1)
score_thr = self.test_cfg.get('score_thr', 0)
result = self.multiclass_nms(mlvl_bboxes, mlvl_bboxes_for_nms,
mlvl_scores, mlvl_dir_scores, score_thr,
self.test_cfg.max_per_img)
result.update(dict(sample_idx=input_meta['sample_idx']))
return result
def multiclass_nms(self, mlvl_bboxes, mlvl_bboxes_for_nms, mlvl_scores,
mlvl_dir_scores, score_thr, max_num):
# do multi class nms
# the fg class id range: [0, num_classes-1]
num_classes = mlvl_scores.shape[1] - 1
bboxes = []
scores = []
labels = []
dir_scores = []
for i in range(0, num_classes):
# get bboxes and scores of this class
cls_inds = mlvl_scores[:, i] > score_thr
if not cls_inds.any():
continue
_scores = mlvl_scores[cls_inds, i]
_bboxes_for_nms = mlvl_bboxes_for_nms[cls_inds, :]
if self.test_cfg.use_rotate_nms:
nms_func = nms_gpu
else:
nms_func = nms_normal_gpu
selected = nms_func(_bboxes_for_nms, _scores,
self.test_cfg.nms_thr)
_mlvl_bboxes = mlvl_bboxes[cls_inds, :]
_mlvl_dir_scores = mlvl_dir_scores[cls_inds]
if len(selected) > 0:
bboxes.append(_mlvl_bboxes[selected])
scores.append(_scores[selected])
dir_scores.append(_mlvl_dir_scores[selected])
dir_rot = box_torch_ops.limit_period(
bboxes[-1][..., 6] - self.dir_offset,
self.dir_limit_offset, np.pi)
bboxes[-1][..., 6] = (
dir_rot + self.dir_offset +
np.pi * dir_scores[-1].to(bboxes[-1].dtype))
cls_label = mlvl_bboxes.new_full((len(selected), ),
i,
dtype=torch.long)
labels.append(cls_label)
if bboxes:
bboxes = torch.cat(bboxes, dim=0)
scores = torch.cat(scores, dim=0)
labels = torch.cat(labels, dim=0)
dir_scores = torch.cat(dir_scores, dim=0)
if bboxes.shape[0] > max_num:
_, inds = scores.sort(descending=True)
inds = inds[:max_num]
bboxes = bboxes[inds, :]
labels = labels[inds]
scores = scores[inds]
dir_scores = dir_scores[inds]
return dict(
box3d_lidar=bboxes.cpu(),
scores=scores.cpu(),
label_preds=labels.cpu(),
)
else:
return dict(
box3d_lidar=mlvl_bboxes.new_zeros([0,
self.box_code_size]).cpu(),
scores=mlvl_bboxes.new_zeros([0]).cpu(),
label_preds=mlvl_bboxes.new_zeros([0, 4]).cpu(),
)
from __future__ import division
import numpy as np
import torch
import torch.nn as nn
from mmcv.cnn import normal_init
from mmdet3d.core import (PseudoSampler, box_torch_ops,
boxes3d_to_bev_torch_lidar, build_anchor_generator,
build_assigner, build_bbox_coder, build_sampler,
multi_apply)
from mmdet3d.ops.iou3d.iou3d_utils import nms_gpu, nms_normal_gpu
from ..builder import build_loss
from ..registry import HEADS
from ..utils import bias_init_with_prob
from .train_mixins import AnchorTrainMixin
@HEADS.register_module
class SECONDHead(nn.Module, AnchorTrainMixin):
"""Anchor-based head (RPN, RetinaNet, SSD, etc.).
Args:
in_channels (int): Number of channels in the input feature map.
feat_channels (int): Number of channels of the feature map.
anchor_scales (Iterable): Anchor scales.
anchor_ratios (Iterable): Anchor aspect ratios.
anchor_strides (Iterable): Anchor strides.
anchor_base_sizes (Iterable): Anchor base sizes.
target_means (Iterable): Mean values of regression targets.
target_stds (Iterable): Std values of regression targets.
loss_cls (dict): Config of classification loss.
loss_bbox (dict): Config of localization loss.
""" # noqa: W605
def __init__(self,
class_name,
in_channels,
train_cfg,
test_cfg,
cache_anchor=False,
feat_channels=256,
use_direction_classifier=True,
encode_bg_as_zeros=False,
box_code_size=7,
anchor_generator=dict(type='AnchorGeneratorRange'),
anchor_range=[0, -39.68, -1.78, 69.12, 39.68, -1.78],
anchor_strides=[2],
anchor_sizes=[[1.6, 3.9, 1.56]],
anchor_rotations=[0, 1.57],
anchor_custom_values=[],
assigner_per_size=False,
assign_per_class=False,
diff_rad_by_sin=True,
dir_offset=0,
dir_limit_offset=1,
target_means=(.0, .0, .0, .0),
target_stds=(1.0, 1.0, 1.0, 1.0),
bbox_coder=dict(type='ResidualCoder'),
loss_cls=dict(
type='CrossEntropyLoss',
use_sigmoid=True,
loss_weight=1.0),
loss_bbox=dict(
type='SmoothL1Loss', beta=1.0 / 9.0, loss_weight=2.0),
loss_dir=dict(type='CrossEntropyLoss', loss_weight=0.2)):
super().__init__()
self.in_channels = in_channels
self.num_classes = len(class_name)
self.feat_channels = feat_channels
self.diff_rad_by_sin = diff_rad_by_sin
self.use_direction_classifier = use_direction_classifier
# self.encode_background_as_zeros = encode_bg_as_zeros
self.box_code_size = box_code_size
self.train_cfg = train_cfg
self.test_cfg = test_cfg
self.bbox_coder = build_bbox_coder(bbox_coder)
self.assigner_per_size = assigner_per_size
self.assign_per_class = assign_per_class
self.dir_offset = dir_offset
self.dir_limit_offset = dir_limit_offset
# build target assigner & sampler
if train_cfg is not None:
self.sampling = loss_cls['type'] not in ['FocalLoss', 'GHMC']
if self.sampling:
self.bbox_sampler = build_sampler(train_cfg.sampler)
else:
self.bbox_sampler = PseudoSampler()
if isinstance(train_cfg.assigner, dict):
self.bbox_assigner = build_assigner(train_cfg.assigner)
elif isinstance(train_cfg.assigner, list):
self.bbox_assigner = [
build_assigner(res) for res in train_cfg.assigner
]
# build anchor generator
self.anchor_range = anchor_range
self.anchor_rotations = anchor_rotations
self.anchor_strides = anchor_strides
self.anchor_sizes = anchor_sizes
self.target_means = target_means
self.target_stds = target_stds
self.anchor_generators = []
# In 3D detection, the anchor stride is connected with anchor size
self.num_anchors = (
len(self.anchor_rotations) * len(self.anchor_sizes))
# if len(self.anchor_sizes) != self.anchor_strides:
# # this means different anchor in the same anchor strides
# anchor_sizes = [self.anchor_sizes]
for anchor_stride in self.anchor_strides:
anchor_generator.update(
anchor_ranges=anchor_range,
sizes=self.anchor_sizes,
stride=anchor_stride,
rotations=anchor_rotations,
custom_values=anchor_custom_values,
cache_anchor=cache_anchor)
self.anchor_generators.append(
build_anchor_generator(anchor_generator))
self._init_layers()
self.use_sigmoid_cls = loss_cls.get('use_sigmoid', False)
if not self.use_sigmoid_cls:
self.num_classes += 1
self.loss_cls = build_loss(loss_cls)
self.loss_bbox = build_loss(loss_bbox)
self.loss_dir = build_loss(loss_dir)
self.fp16_enabled = False
def _init_layers(self):
self.cls_out_channels = self.num_anchors * self.num_classes
self.conv_cls = nn.Conv2d(self.feat_channels, self.cls_out_channels, 1)
self.conv_reg = nn.Conv2d(self.feat_channels,
self.num_anchors * self.box_code_size, 1)
if self.use_direction_classifier:
self.conv_dir_cls = nn.Conv2d(self.feat_channels,
self.num_anchors * 2, 1)
def init_weights(self):
bias_cls = bias_init_with_prob(0.01)
normal_init(self.conv_cls, std=0.01, bias=bias_cls)
normal_init(self.conv_reg, std=0.01)
def forward_single(self, x):
cls_score = self.conv_cls(x)
bbox_pred = self.conv_reg(x)
dir_cls_preds = None
if self.use_direction_classifier:
dir_cls_preds = self.conv_dir_cls(x)
return cls_score, bbox_pred, dir_cls_preds
def forward(self, feats):
return multi_apply(self.forward_single, feats)
def get_anchors(self, featmap_sizes, input_metas):
"""Get anchors according to feature map sizes.
Args:
featmap_sizes (list[tuple]): Multi-level feature map sizes.
input_metas (list[dict]): contain pcd and img's meta info.
Returns:
tuple: anchors of each image, valid flags of each image
"""
num_imgs = len(input_metas)
num_levels = len(featmap_sizes)
# since feature map sizes of all images are the same, we only compute
# anchors for one time
multi_level_anchors = []
for i in range(num_levels):
anchors = self.anchor_generators[i].grid_anchors(featmap_sizes[i])
if not self.assigner_per_size:
anchors = anchors.reshape(-1, anchors.size(-1))
multi_level_anchors.append(anchors)
anchor_list = [multi_level_anchors for _ in range(num_imgs)]
return anchor_list
def loss_single(self, cls_score, bbox_pred, dir_cls_preds, labels,
label_weights, bbox_targets, bbox_weights, dir_targets,
dir_weights, num_total_samples):
# classification loss
if num_total_samples is None:
num_total_samples = int(cls_score.shape[0])
labels = labels.reshape(-1)
label_weights = label_weights.reshape(-1)
cls_score = cls_score.permute(0, 2, 3, 1).reshape(-1, self.num_classes)
loss_cls = self.loss_cls(
cls_score, labels, label_weights, avg_factor=num_total_samples)
# regression loss
bbox_targets = bbox_targets.reshape(-1, self.box_code_size)
bbox_weights = bbox_weights.reshape(-1, self.box_code_size)
code_weight = self.train_cfg.get('code_weight', None)
if code_weight:
bbox_weights = bbox_weights * bbox_weights.new_tensor(code_weight)
bbox_pred = bbox_pred.permute(0, 2, 3,
1).reshape(-1, self.box_code_size)
if self.diff_rad_by_sin:
bbox_pred, bbox_targets = self.add_sin_difference(
bbox_pred, bbox_targets)
loss_bbox = self.loss_bbox(
bbox_pred,
bbox_targets,
bbox_weights,
avg_factor=num_total_samples)
# direction classification loss
loss_dir = None
if self.use_direction_classifier:
dir_cls_preds = dir_cls_preds.permute(0, 2, 3, 1).reshape(-1, 2)
dir_targets = dir_targets.reshape(-1)
dir_weights = dir_weights.reshape(-1)
loss_dir = self.loss_dir(
dir_cls_preds,
dir_targets,
dir_weights,
avg_factor=num_total_samples)
return loss_cls, loss_bbox, loss_dir
@staticmethod
def add_sin_difference(boxes1, boxes2):
rad_pred_encoding = torch.sin(boxes1[..., -1:]) * torch.cos(
boxes2[..., -1:])
rad_tg_encoding = torch.cos(boxes1[..., -1:]) * torch.sin(boxes2[...,
-1:])
boxes1 = torch.cat([boxes1[..., :-1], rad_pred_encoding], dim=-1)
boxes2 = torch.cat([boxes2[..., :-1], rad_tg_encoding], dim=-1)
return boxes1, boxes2
def loss(self,
cls_scores,
bbox_preds,
dir_cls_preds,
gt_bboxes,
gt_labels,
input_metas,
gt_bboxes_ignore=None):
featmap_sizes = [featmap.size()[-2:] for featmap in cls_scores]
assert len(featmap_sizes) == len(self.anchor_generators)
anchor_list = self.get_anchors(featmap_sizes, input_metas)
label_channels = self.cls_out_channels if self.use_sigmoid_cls else 1
cls_reg_targets = self.anchor_target_3d(
anchor_list,
gt_bboxes,
input_metas,
self.target_means,
self.target_stds,
gt_bboxes_ignore_list=gt_bboxes_ignore,
gt_labels_list=gt_labels,
num_classes=self.num_classes,
label_channels=label_channels,
sampling=self.sampling)
if cls_reg_targets is None:
return None
(labels_list, label_weights_list, bbox_targets_list, bbox_weights_list,
dir_targets_list, dir_weights_list, num_total_pos,
num_total_neg) = cls_reg_targets
num_total_samples = (
num_total_pos + num_total_neg if self.sampling else num_total_pos)
# num_total_samples = None
losses_cls, losses_bbox, losses_dir = multi_apply(
self.loss_single,
cls_scores,
bbox_preds,
dir_cls_preds,
labels_list,
label_weights_list,
bbox_targets_list,
bbox_weights_list,
dir_targets_list,
dir_weights_list,
num_total_samples=num_total_samples)
return dict(
loss_cls_3d=losses_cls,
loss_bbox_3d=losses_bbox,
loss_dir_3d=losses_dir)
def get_bboxes(self,
cls_scores,
bbox_preds,
dir_cls_preds,
input_metas,
rescale=False):
assert len(cls_scores) == len(bbox_preds)
assert len(cls_scores) == len(dir_cls_preds)
num_levels = len(cls_scores)
mlvl_anchors = [
self.anchor_generators[i].grid_anchors(
cls_scores[i].size()[-2:]).reshape(-1, self.box_code_size)
for i in range(num_levels)
]
result_list = []
for img_id in range(len(input_metas)):
cls_score_list = [
cls_scores[i][img_id].detach() for i in range(num_levels)
]
bbox_pred_list = [
bbox_preds[i][img_id].detach() for i in range(num_levels)
]
dir_cls_pred_list = [
dir_cls_preds[i][img_id].detach() for i in range(num_levels)
]
input_meta = input_metas[img_id]
proposals = self.get_bboxes_single(cls_score_list, bbox_pred_list,
dir_cls_pred_list, mlvl_anchors,
input_meta, rescale)
result_list.append(proposals)
return result_list
def get_bboxes_single(self,
cls_scores,
bbox_preds,
dir_cls_preds,
mlvl_anchors,
input_meta,
rescale=False):
assert len(cls_scores) == len(bbox_preds) == len(mlvl_anchors)
mlvl_bboxes = []
mlvl_scores = []
mlvl_dir_scores = []
mlvl_bboxes_for_nms = []
for cls_score, bbox_pred, dir_cls_pred, anchors in zip(
cls_scores, bbox_preds, dir_cls_preds, mlvl_anchors):
assert cls_score.size()[-2:] == bbox_pred.size()[-2:]
if self.use_direction_classifier:
assert cls_score.size()[-2:] == dir_cls_pred.size()[-2:]
cls_score = cls_score.permute(1, 2,
0).reshape(-1, self.num_classes)
if self.use_sigmoid_cls:
scores = cls_score.sigmoid()
else:
scores = cls_score.softmax(-1)
bbox_pred = bbox_pred.permute(1, 2,
0).reshape(-1, self.box_code_size)
dir_cls_pred = dir_cls_pred.permute(1, 2, 0).reshape(-1, 2)
dir_cls_score = torch.max(dir_cls_pred, dim=-1)[1]
score_thr = self.test_cfg.get('score_thr', 0)
if score_thr > 0:
if self.use_sigmoid_cls:
max_scores, _ = scores.max(dim=1)
else:
max_scores, _ = scores[:, 1:].max(dim=1)
thr_inds = (max_scores >= score_thr)
anchors = anchors[thr_inds]
bbox_pred = bbox_pred[thr_inds]
scores = scores[thr_inds]
dir_cls_scores = dir_cls_score[thr_inds]
bboxes = self.bbox_coder.decode_torch(anchors, bbox_pred,
self.target_means,
self.target_stds)
bboxes_for_nms = boxes3d_to_bev_torch_lidar(bboxes)
mlvl_bboxes_for_nms.append(bboxes_for_nms)
mlvl_bboxes.append(bboxes)
mlvl_scores.append(scores)
mlvl_dir_scores.append(dir_cls_scores)
mlvl_bboxes = torch.cat(mlvl_bboxes)
mlvl_bboxes_for_nms = torch.cat(mlvl_bboxes_for_nms)
mlvl_scores = torch.cat(mlvl_scores)
mlvl_dir_scores = torch.cat(mlvl_dir_scores)
if len(mlvl_scores) > 0:
mlvl_scores, mlvl_label_preds = mlvl_scores.max(dim=-1)
if self.test_cfg.use_rotate_nms:
nms_func = nms_gpu
else:
nms_func = nms_normal_gpu
selected = nms_func(mlvl_bboxes_for_nms, mlvl_scores,
self.test_cfg.nms_thr)
else:
selected = []
if len(selected) > 0:
selected_bboxes = mlvl_bboxes[selected]
selected_scores = mlvl_scores[selected]
selected_label_preds = mlvl_label_preds[selected]
selected_dir_scores = mlvl_dir_scores[selected]
dir_rot = box_torch_ops.limit_period(
selected_bboxes[..., -1] - self.dir_offset,
self.dir_limit_offset, np.pi)
selected_bboxes[..., -1] = (
dir_rot + self.dir_offset +
np.pi * selected_dir_scores.to(selected_bboxes.dtype))
return dict(
box3d_lidar=selected_bboxes.cpu(),
scores=selected_scores.cpu(),
label_preds=selected_label_preds.cpu(),
sample_idx=input_meta['sample_idx'],
)
return dict(
box3d_lidar=mlvl_scores.new_zeros([0, 7]).cpu(),
scores=mlvl_scores.new_zeros([0]).cpu(),
label_preds=mlvl_scores.new_zeros([0, 4]).cpu(),
sample_idx=input_meta['sample_idx'],
)
import numpy as np
import torch
from mmdet3d.core import box_torch_ops, images_to_levels, multi_apply
class AnchorTrainMixin(object):
def anchor_target_3d(self,
anchor_list,
gt_bboxes_list,
input_metas,
target_means,
target_stds,
gt_bboxes_ignore_list=None,
gt_labels_list=None,
label_channels=1,
num_classes=1,
sampling=True):
"""Compute regression and classification targets for anchors.
Args:
anchor_list (list[list]): Multi level anchors of each image.
gt_bboxes_list (list[Tensor]): Ground truth bboxes of each image.
img_metas (list[dict]): Meta info of each image.
target_means (Iterable): Mean value of regression targets.
target_stds (Iterable): Std value of regression targets.
Returns:
tuple
"""
num_imgs = len(input_metas)
assert len(anchor_list) == num_imgs
# anchor number of multi levels
num_level_anchors = [
anchors.view(-1, self.box_code_size).size(0)
for anchors in anchor_list[0]
]
# concat all level anchors and flags to a single tensor
for i in range(num_imgs):
anchor_list[i] = torch.cat(anchor_list[i])
# compute targets for each image
if gt_bboxes_ignore_list is None:
gt_bboxes_ignore_list = [None for _ in range(num_imgs)]
if gt_labels_list is None:
gt_labels_list = [None for _ in range(num_imgs)]
(all_labels, all_label_weights, all_bbox_targets, all_bbox_weights,
all_dir_targets, all_dir_weights, pos_inds_list,
neg_inds_list) = multi_apply(
self.anchor_target_3d_single,
anchor_list,
gt_bboxes_list,
gt_bboxes_ignore_list,
gt_labels_list,
input_metas,
target_means=target_means,
target_stds=target_stds,
label_channels=label_channels,
num_classes=num_classes,
sampling=sampling)
# no valid anchors
if any([labels is None for labels in all_labels]):
return None
# sampled anchors of all images
num_total_pos = sum([max(inds.numel(), 1) for inds in pos_inds_list])
num_total_neg = sum([max(inds.numel(), 1) for inds in neg_inds_list])
# split targets to a list w.r.t. multiple levels
labels_list = images_to_levels(all_labels, num_level_anchors)
label_weights_list = images_to_levels(all_label_weights,
num_level_anchors)
bbox_targets_list = images_to_levels(all_bbox_targets,
num_level_anchors)
bbox_weights_list = images_to_levels(all_bbox_weights,
num_level_anchors)
dir_targets_list = images_to_levels(all_dir_targets, num_level_anchors)
dir_weights_list = images_to_levels(all_dir_weights, num_level_anchors)
return (labels_list, label_weights_list, bbox_targets_list,
bbox_weights_list, dir_targets_list, dir_weights_list,
num_total_pos, num_total_neg)
def anchor_target_3d_single(self,
anchors,
gt_bboxes,
gt_bboxes_ignore,
gt_labels,
input_meta,
target_means,
target_stds,
label_channels=1,
num_classes=1,
sampling=True):
if isinstance(self.bbox_assigner, list):
feat_size = anchors.size(0) * anchors.size(1) * anchors.size(2)
rot_angles = anchors.size(-2)
assert len(self.bbox_assigner) == anchors.size(-3)
(total_labels, total_label_weights, total_bbox_targets,
total_bbox_weights, total_dir_targets, total_dir_weights,
total_pos_inds, total_neg_inds) = [], [], [], [], [], [], [], []
current_anchor_num = 0
for i, assigner in enumerate(self.bbox_assigner):
current_anchors = anchors[..., i, :, :].reshape(
-1, self.box_code_size)
current_anchor_num += current_anchors.size(0)
if self.assign_per_class:
gt_per_cls = (gt_labels == i)
anchor_targets = self.anchor_target_single_assigner(
assigner, current_anchors, gt_bboxes[gt_per_cls, :],
gt_bboxes_ignore, gt_labels[gt_per_cls], input_meta,
target_means, target_stds, label_channels, num_classes,
sampling)
else:
anchor_targets = self.anchor_target_single_assigner(
assigner, current_anchors, gt_bboxes, gt_bboxes_ignore,
gt_labels, input_meta, target_means, target_stds,
label_channels, num_classes, sampling)
(labels, label_weights, bbox_targets, bbox_weights,
dir_targets, dir_weights, pos_inds, neg_inds) = anchor_targets
total_labels.append(labels.reshape(feat_size, 1, rot_angles))
total_label_weights.append(
label_weights.reshape(feat_size, 1, rot_angles))
total_bbox_targets.append(
bbox_targets.reshape(feat_size, 1, rot_angles,
anchors.size(-1)))
total_bbox_weights.append(
bbox_weights.reshape(feat_size, 1, rot_angles,
anchors.size(-1)))
total_dir_targets.append(
dir_targets.reshape(feat_size, 1, rot_angles))
total_dir_weights.append(
dir_weights.reshape(feat_size, 1, rot_angles))
total_pos_inds.append(pos_inds)
total_neg_inds.append(neg_inds)
total_labels = torch.cat(total_labels, dim=-2).reshape(-1)
total_label_weights = torch.cat(
total_label_weights, dim=-2).reshape(-1)
total_bbox_targets = torch.cat(
total_bbox_targets, dim=-3).reshape(-1, anchors.size(-1))
total_bbox_weights = torch.cat(
total_bbox_weights, dim=-3).reshape(-1, anchors.size(-1))
total_dir_targets = torch.cat(
total_dir_targets, dim=-2).reshape(-1)
total_dir_weights = torch.cat(
total_dir_weights, dim=-2).reshape(-1)
total_pos_inds = torch.cat(total_pos_inds, dim=0).reshape(-1)
total_neg_inds = torch.cat(total_neg_inds, dim=0).reshape(-1)
return (total_labels, total_label_weights, total_bbox_targets,
total_bbox_weights, total_dir_targets, total_dir_weights,
total_pos_inds, total_neg_inds)
else:
return self.anchor_target_single_assigner(
self.bbox_assigner, anchors, gt_bboxes, gt_bboxes_ignore,
gt_labels, input_meta, target_means, target_stds,
label_channels, num_classes, sampling)
def anchor_target_single_assigner(self,
bbox_assigner,
anchors,
gt_bboxes,
gt_bboxes_ignore,
gt_labels,
input_meta,
target_means,
target_stds,
label_channels=1,
num_classes=1,
sampling=True):
anchors = anchors.reshape(-1, anchors.size(-1))
num_valid_anchors = anchors.shape[0]
bbox_targets = torch.zeros_like(anchors)
bbox_weights = torch.zeros_like(anchors)
dir_targets = anchors.new_zeros((anchors.shape[0]), dtype=torch.long)
dir_weights = anchors.new_zeros((anchors.shape[0]), dtype=torch.float)
labels = anchors.new_zeros(num_valid_anchors, dtype=torch.long)
label_weights = anchors.new_zeros(num_valid_anchors, dtype=torch.float)
if len(gt_bboxes) > 0:
assign_result = bbox_assigner.assign(anchors, gt_bboxes,
gt_bboxes_ignore, gt_labels)
sampling_result = self.bbox_sampler.sample(assign_result, anchors,
gt_bboxes)
pos_inds = sampling_result.pos_inds
neg_inds = sampling_result.neg_inds
else:
pos_inds = torch.nonzero(
anchors.new_zeros((anchors.shape[0], ), dtype=torch.long) > 0
).squeeze(-1).unique()
neg_inds = torch.nonzero(
anchors.new_zeros((anchors.shape[0], ), dtype=torch.long) ==
0).squeeze(-1).unique()
if gt_labels is not None:
labels += num_classes
if len(pos_inds) > 0:
pos_bbox_targets = self.bbox_coder.encode_torch(
sampling_result.pos_bboxes, sampling_result.pos_gt_bboxes,
target_means, target_stds)
pos_dir_targets = get_direction_target(
sampling_result.pos_bboxes,
pos_bbox_targets,
self.dir_offset,
one_hot=False)
bbox_targets[pos_inds, :] = pos_bbox_targets
bbox_weights[pos_inds, :] = 1.0
dir_targets[pos_inds] = pos_dir_targets
dir_weights[pos_inds] = 1.0
if gt_labels is None:
labels[pos_inds] = 1
else:
labels[pos_inds] = gt_labels[
sampling_result.pos_assigned_gt_inds]
if self.train_cfg.pos_weight <= 0:
label_weights[pos_inds] = 1.0
else:
label_weights[pos_inds] = self.train_cfg.pos_weight
if len(neg_inds) > 0:
label_weights[neg_inds] = 1.0
return (labels, label_weights, bbox_targets, bbox_weights, dir_targets,
dir_weights, pos_inds, neg_inds)
def get_direction_target(anchors,
reg_targets,
dir_offset=0,
num_bins=2,
one_hot=True):
rot_gt = reg_targets[..., 6] + anchors[..., 6]
offset_rot = box_torch_ops.limit_period(rot_gt - dir_offset, 0, 2 * np.pi)
dir_cls_targets = torch.floor(offset_rot / (2 * np.pi / num_bins)).long()
dir_cls_targets = torch.clamp(dir_cls_targets, min=0, max=num_bins - 1)
if one_hot:
dir_targets = torch.zeros(
*list(dir_cls_targets.shape),
num_bins,
dtype=anchors.dtype,
device=dir_cls_targets.device)
dir_targets.scatter_(dir_cls_targets.unsqueeze(dim=-1).long(), 1.0)
dir_cls_targets = dir_targets
return dir_cls_targets
from mmdet.models.backbone import SSDVGG, HRNet, ResNet, ResNetV1d, ResNeXt
from .second import SECOND
__all__ = ['ResNet', 'ResNetV1d', 'ResNeXt', 'SSDVGG', 'HRNet', 'SECOND']
from functools import partial
import torch.nn as nn
from mmcv.runner import load_checkpoint
from ..registry import BACKBONES
from ..utils import build_norm_layer
class Empty(nn.Module):
def __init__(self, *args, **kwargs):
super(Empty, self).__init__()
def forward(self, *args, **kwargs):
if len(args) == 1:
return args[0]
elif len(args) == 0:
return None
return args
@BACKBONES.register_module
class SECOND(nn.Module):
"""Compare with RPN, RPNV2 support arbitrary number of stage.
"""
def __init__(self,
in_channels=128,
layer_nums=[3, 5, 5],
layer_strides=[2, 2, 2],
num_filters=[128, 128, 256],
norm_cfg=dict(type='BN', eps=1e-3, momentum=0.01)):
super(SECOND, self).__init__()
assert len(layer_strides) == len(layer_nums)
assert len(num_filters) == len(layer_nums)
if norm_cfg is not None:
Conv2d = partial(nn.Conv2d, bias=False)
else:
Conv2d = partial(nn.Conv2d, bias=True)
in_filters = [in_channels, *num_filters[:-1]]
# note that when stride > 1, conv2d with same padding isn't
# equal to pad-conv2d. we should use pad-conv2d.
blocks = []
for i, layer_num in enumerate(layer_nums):
norm_layer = (
build_norm_layer(norm_cfg, num_filters[i])[1]
if norm_cfg is not None else Empty)
block = [
nn.ZeroPad2d(1),
Conv2d(
in_filters[i], num_filters[i], 3, stride=layer_strides[i]),
norm_layer,
nn.ReLU(inplace=True),
]
for j in range(layer_num):
norm_layer = (
build_norm_layer(norm_cfg, num_filters[i])[1]
if norm_cfg is not None else Empty)
block.append(
Conv2d(num_filters[i], num_filters[i], 3, padding=1))
block.append(norm_layer)
block.append(nn.ReLU(inplace=True))
block = nn.Sequential(*block)
blocks.append(block)
self.blocks = nn.ModuleList(blocks)
def init_weights(self, pretrained=None):
if isinstance(pretrained, str):
from mmdet3d.apis import get_root_logger
logger = get_root_logger()
load_checkpoint(self, pretrained, strict=False, logger=logger)
def forward(self, x):
outs = []
for i in range(len(self.blocks)):
x = self.blocks[i](x)
outs.append(x)
return tuple(outs)
from mmdet.models.bbox_heads import (BBoxHead, ConvFCBBoxHead,
DoubleConvFCBBoxHead, Shared2FCBBoxHead,
Shared4Conv1FCBBoxHead)
__all__ = [
'BBoxHead', 'ConvFCBBoxHead', 'Shared2FCBBoxHead',
'Shared4Conv1FCBBoxHead', 'DoubleConvFCBBoxHead'
]
from torch import nn
from mmdet.models.registry import (BACKBONES, DETECTORS, HEADS, LOSSES, NECKS,
ROI_EXTRACTORS, SHARED_HEADS)
from ..utils import build_from_cfg
from .registry import FUSION_LAYERS, MIDDLE_ENCODERS, VOXEL_ENCODERS
def build(cfg, registry, default_args=None):
if isinstance(cfg, list):
modules = [
build_from_cfg(cfg_, registry, default_args) for cfg_ in cfg
]
return nn.Sequential(*modules)
else:
return build_from_cfg(cfg, registry, default_args)
def build_backbone(cfg):
return build(cfg, BACKBONES)
def build_neck(cfg):
return build(cfg, NECKS)
def build_roi_extractor(cfg):
return build(cfg, ROI_EXTRACTORS)
def build_shared_head(cfg):
return build(cfg, SHARED_HEADS)
def build_head(cfg):
return build(cfg, HEADS)
def build_loss(cfg):
return build(cfg, LOSSES)
def build_detector(cfg, train_cfg=None, test_cfg=None):
return build(cfg, DETECTORS, dict(train_cfg=train_cfg, test_cfg=test_cfg))
def build_voxel_encoder(cfg):
return build(cfg, VOXEL_ENCODERS)
def build_middle_encoder(cfg):
return build(cfg, MIDDLE_ENCODERS)
def build_fusion_layer(cfg):
return build(cfg, FUSION_LAYERS)
from .base import BaseDetector
from .mvx_faster_rcnn import (DynamicMVXFasterRCNN, DynamicMVXFasterRCNNV2,
DynamicMVXFasterRCNNV3)
from .mvx_single_stage import MVXSingleStageDetector
from .mvx_two_stage import MVXTwoStageDetector
from .single_stage import SingleStageDetector
from .two_stage import TwoStageDetector
from .voxelnet import DynamicVoxelNet, VoxelNet
__all__ = [
'BaseDetector', 'SingleStageDetector', 'VoxelNet', 'DynamicVoxelNet',
'TwoStageDetector', 'MVXSingleStageDetector', 'MVXTwoStageDetector',
'DynamicMVXFasterRCNN', 'DynamicMVXFasterRCNNV2', 'DynamicMVXFasterRCNNV3'
]
from abc import ABCMeta, abstractmethod
import torch.nn as nn
class BaseDetector(nn.Module, metaclass=ABCMeta):
"""Base class for detectors"""
def __init__(self):
super(BaseDetector, self).__init__()
self.fp16_enabled = False
@property
def with_neck(self):
return hasattr(self, 'neck') and self.neck is not None
@property
def with_voxel_encoder(self):
return hasattr(self,
'voxel_encoder') and self.voxel_encoder is not None
@property
def with_middle_encoder(self):
return hasattr(self,
'middle_encoder') and self.middle_encoder is not None
@property
def with_shared_head(self):
return hasattr(self, 'shared_head') and self.shared_head is not None
@property
def with_bbox(self):
return hasattr(self, 'bbox_head') and self.bbox_head is not None
@property
def with_mask(self):
return hasattr(self, 'mask_head') and self.mask_head is not None
@abstractmethod
def extract_feat(self, imgs):
pass
def extract_feats(self, imgs):
assert isinstance(imgs, list)
for img in imgs:
yield self.extract_feat(img)
@abstractmethod
def forward_train(self, **kwargs):
pass
@abstractmethod
def simple_test(self, **kwargs):
pass
@abstractmethod
def aug_test(self, **kwargs):
pass
def init_weights(self, pretrained=None):
if pretrained is not None:
from mmdet3d.apis import get_root_logger
logger = get_root_logger()
logger.info('load model from: {}'.format(pretrained))
def forward_test(self, imgs, img_metas, **kwargs):
"""
Args:
imgs (List[Tensor]): the outer list indicates test-time
augmentations and inner Tensor should have a shape NxCxHxW,
which contains all images in the batch.
img_meta (List[List[dict]]): the outer list indicates test-time
augs (multiscale, flip, etc.) and the inner list indicates
images in a batch
"""
for var, name in [(imgs, 'imgs'), (img_metas, 'img_metas')]:
if not isinstance(var, list):
raise TypeError('{} must be a list, but got {}'.format(
name, type(var)))
num_augs = len(imgs)
if num_augs != len(img_metas):
raise ValueError(
'num of augmentations ({}) != num of image meta ({})'.format(
len(imgs), len(img_metas)))
# TODO: remove the restriction of imgs_per_gpu == 1 when prepared
imgs_per_gpu = imgs[0].size(0)
assert imgs_per_gpu == 1
if num_augs == 1:
return self.simple_test(imgs[0], img_metas[0], **kwargs)
else:
return self.aug_test(imgs, img_metas, **kwargs)
def forward(self, img, img_meta, return_loss=True, **kwargs):
"""
Calls either forward_train or forward_test depending on whether
return_loss=True. Note this setting will change the expected inputs.
When `return_loss=True`, img and img_meta are single-nested (i.e.
Tensor and List[dict]), and when `resturn_loss=False`, img and img_meta
should be double nested (i.e. List[Tensor], List[List[dict]]), with
the outer list indicating test time augmentations.
"""
# TODO: current version only support 2D detector now, find
# a better way to be compatible with both
if return_loss:
return self.forward_train(img, img_meta, **kwargs)
else:
return self.forward_test(img, img_meta, **kwargs)
import torch
import torch.nn.functional as F
from mmdet.models.registry import DETECTORS
from .mvx_two_stage import MVXTwoStageDetector
@DETECTORS.register_module
class DynamicMVXFasterRCNN(MVXTwoStageDetector):
def __init__(self, **kwargs):
super(DynamicMVXFasterRCNN, self).__init__(**kwargs)
def extract_pts_feat(self, points, img_feats, img_meta):
if not self.with_pts_bbox:
return None
voxels, coors = self.voxelize(points)
# adopt an early fusion strategy
if self.with_fusion:
voxels = self.pts_fusion_layer(img_feats, points, voxels, img_meta)
voxel_features, feature_coors = self.pts_voxel_encoder(voxels, coors)
batch_size = coors[-1, 0] + 1
x = self.pts_middle_encoder(voxel_features, feature_coors, batch_size)
x = self.pts_backbone(x)
if self.with_pts_neck:
x = self.pts_neck(x)
return x
@torch.no_grad()
def voxelize(self, points):
coors = []
# dynamic voxelization only provide a coors mapping
for res in points:
res_coors = self.pts_voxel_layer(res)
coors.append(res_coors)
points = torch.cat(points, dim=0)
coors_batch = []
for i, coor in enumerate(coors):
coor_pad = F.pad(coor, (1, 0), mode='constant', value=i)
coors_batch.append(coor_pad)
coors_batch = torch.cat(coors_batch, dim=0)
return points, coors_batch
@DETECTORS.register_module
class DynamicMVXFasterRCNNV2(DynamicMVXFasterRCNN):
def __init__(self, **kwargs):
super(DynamicMVXFasterRCNNV2, self).__init__(**kwargs)
def extract_pts_feat(self, points, img_feats, img_meta):
if not self.with_pts_bbox:
return None
voxels, coors = self.voxelize(points)
voxel_features, feature_coors = self.pts_voxel_encoder(
voxels, coors, points, img_feats, img_meta)
batch_size = coors[-1, 0] + 1
x = self.pts_middle_encoder(voxel_features, feature_coors, batch_size)
x = self.pts_backbone(x)
if self.with_pts_neck:
x = self.pts_neck(x)
return x
@DETECTORS.register_module
class MVXFasterRCNNV2(MVXTwoStageDetector):
def __init__(self, **kwargs):
super(MVXFasterRCNNV2, self).__init__(**kwargs)
def extract_pts_feat(self, pts, img_feats, img_meta):
if not self.with_pts_bbox:
return None
voxels, num_points, coors = self.voxelize(pts)
voxel_features = self.pts_voxel_encoder(voxels, num_points, coors,
img_feats, img_meta)
batch_size = coors[-1, 0] + 1
x = self.pts_middle_encoder(voxel_features, coors, batch_size)
x = self.pts_backbone(x)
if self.with_pts_neck:
x = self.pts_neck(x)
return x
@DETECTORS.register_module
class DynamicMVXFasterRCNNV3(DynamicMVXFasterRCNN):
def __init__(self, **kwargs):
super(DynamicMVXFasterRCNNV3, self).__init__(**kwargs)
def extract_pts_feat(self, points, img_feats, img_meta):
if not self.with_pts_bbox:
return None
voxels, coors = self.voxelize(points)
voxel_features, feature_coors = self.pts_voxel_encoder(voxels, coors)
batch_size = coors[-1, 0] + 1
x = self.pts_middle_encoder(voxel_features, feature_coors, batch_size)
x = self.pts_backbone(x)
if self.with_pts_neck:
x = self.pts_neck(x, coors, points, img_feats, img_meta)
return x
import torch
import torch.nn as nn
import torch.nn.functional as F
from mmdet3d.ops import Voxelization
from mmdet.models.registry import DETECTORS
from .. import builder
from .base import BaseDetector
@DETECTORS.register_module
class MVXSingleStageDetector(BaseDetector):
def __init__(self,
voxel_layer,
voxel_encoder,
middle_encoder,
fusion_layer,
img_backbone,
pts_backbone,
img_neck=None,
pts_neck=None,
pts_bbox_head=None,
img_bbox_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(MVXSingleStageDetector, self).__init__()
self.voxel_layer = Voxelization(**voxel_layer)
self.voxel_encoder = builder.build_voxel_encoder(voxel_encoder)
self.middle_encoder = builder.build_middle_encoder(middle_encoder)
self.pts_backbone = builder.build_backbone(pts_backbone)
if fusion_layer:
self.fusion_layer = builder.build_fusion_layer(fusion_layer)
if img_backbone:
self.img_backbone = builder.build_backbone(img_backbone)
pts_bbox_head.update(train_cfg=train_cfg)
pts_bbox_head.update(test_cfg=test_cfg)
self.pts_bbox_head = builder.build_head(pts_bbox_head)
if img_neck is not None:
self.img_neck = builder.build_neck(img_neck)
if pts_neck is not None:
self.pts_neck = builder.build_neck(pts_neck)
if img_bbox_head is not None:
self.img_bbox_head = builder.build_head(img_bbox_head)
self.train_cfg = train_cfg
self.test_cfg = test_cfg
self.init_weights(pretrained=pretrained)
def init_weights(self, pretrained=None):
super(MVXSingleStageDetector, self).init_weights(pretrained)
if self.with_img_backbone:
self.img_backbone.init_weights(pretrained=pretrained)
if self.with_img_neck:
if isinstance(self.img_neck, nn.Sequential):
for m in self.img_neck:
m.init_weights()
else:
self.img_neck.init_weights()
if self.with_img_bbox:
self.img_bbox_head.init_weights()
if self.with_pts_bbox:
self.pts_bbox_head.init_weights()
@property
def with_pts_bbox(self):
return hasattr(self,
'pts_bbox_head') and self.pts_bbox_head is not None
@property
def with_img_bbox(self):
return hasattr(self,
'img_bbox_head') and self.img_bbox_head is not None
@property
def with_img_backbone(self):
return hasattr(self, 'img_backbone') and self.img_backbone is not None
@property
def with_fusion(self):
return hasattr(self, 'fusion_layer') and self.fusion_layer is not None
@property
def with_img_neck(self):
return hasattr(self, 'img_neck') and self.img_neck is not None
@property
def with_pts_neck(self):
return hasattr(self, 'pts_neck') and self.pts_neck is not None
def extract_feat(self, points, img, img_meta):
if self.with_img_backbone:
img_feats = self.img_backbone(img)
if self.with_img_neck:
img_feats = self.img_neck(img_feats)
voxels, num_points, coors = self.voxelize(points)
voxel_features = self.voxel_encoder(voxels, num_points, coors)
batch_size = coors[-1, 0] + 1
x = self.middle_encoder(voxel_features, coors, batch_size)
x = self.pts_backbone(x)
if self.with_neck:
x = self.pts_neck(x)
return x
@torch.no_grad()
def voxelize(self, points):
voxels, coors, num_points = [], [], []
for res in points:
res_voxels, res_coors, res_num_points = self.voxel_layer(res)
voxels.append(res_voxels)
coors.append(res_coors)
num_points.append(res_num_points)
voxels = torch.cat(voxels, dim=0)
num_points = torch.cat(num_points, dim=0)
coors_batch = []
for i, coor in enumerate(coors):
coor_pad = F.pad(coor, (1, 0), mode='constant', value=i)
coors_batch.append(coor_pad)
coors_batch = torch.cat(coors_batch, dim=0)
return voxels, num_points, coors_batch
def forward_train(self,
points,
img_meta,
gt_bboxes_3d,
gt_labels,
img=None,
gt_bboxes_ignore=None):
x = self.extract_feat(points, img=img, img_meta=img_meta)
outs = self.pts_bbox_head(x)
loss_inputs = outs + (gt_bboxes_3d, gt_labels, img_meta)
losses = self.pts_bbox_head.loss(
*loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
return losses
def forward_test(self, **kwargs):
return self.simple_test(**kwargs)
def forward(self, return_loss=True, **kwargs):
if return_loss:
return self.forward_train(**kwargs)
else:
return self.forward_test(**kwargs)
def simple_test(self,
points,
img_meta,
img=None,
gt_bboxes_3d=None,
rescale=False):
x = self.extract_feat(points, img, img_meta)
outs = self.pts_bbox_head(x)
bbox_inputs = outs + (img_meta, rescale)
bbox_list = self.pts_bbox_head.get_bboxes(*bbox_inputs)
return bbox_list
def aug_test(self, points, imgs, img_metas, rescale=False):
raise NotImplementedError
@DETECTORS.register_module
class DynamicMVXNet(MVXSingleStageDetector):
def __init__(self,
voxel_layer,
voxel_encoder,
middle_encoder,
pts_backbone,
fusion_layer=None,
img_backbone=None,
img_neck=None,
pts_neck=None,
pts_bbox_head=None,
img_bbox_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(DynamicMVXNet, self).__init__(
voxel_layer=voxel_layer,
voxel_encoder=voxel_encoder,
middle_encoder=middle_encoder,
img_backbone=img_backbone,
fusion_layer=fusion_layer,
pts_backbone=pts_backbone,
pts_neck=pts_neck,
img_neck=img_neck,
img_bbox_head=img_bbox_head,
pts_bbox_head=pts_bbox_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained,
)
def extract_feat(self, points, img, img_meta):
if self.with_img_backbone:
img_feats = self.img_backbone(img)
if self.with_img_neck:
img_feats = self.img_neck(img_feats)
voxels, coors = self.voxelize(points)
# adopt an early fusion strategy
if self.with_fusion:
voxels = self.fusion_layer(img_feats, points, voxels, img_meta)
voxel_features, feature_coors = self.voxel_encoder(voxels, coors)
batch_size = coors[-1, 0] + 1
x = self.middle_encoder(voxel_features, feature_coors, batch_size)
x = self.pts_backbone(x)
if self.with_pts_neck:
x = self.pts_neck(x)
return x
@torch.no_grad()
def voxelize(self, points):
coors = []
# dynamic voxelization only provide a coors mapping
for res in points:
res_coors = self.voxel_layer(res)
coors.append(res_coors)
points = torch.cat(points, dim=0)
coors_batch = []
for i, coor in enumerate(coors):
coor_pad = F.pad(coor, (1, 0), mode='constant', value=i)
coors_batch.append(coor_pad)
coors_batch = torch.cat(coors_batch, dim=0)
return points, coors_batch
@DETECTORS.register_module
class DynamicMVXNetV2(DynamicMVXNet):
def __init__(self,
voxel_layer,
voxel_encoder,
middle_encoder,
pts_backbone,
fusion_layer=None,
img_backbone=None,
img_neck=None,
pts_neck=None,
pts_bbox_head=None,
img_bbox_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(DynamicMVXNetV2, self).__init__(
voxel_layer=voxel_layer,
voxel_encoder=voxel_encoder,
middle_encoder=middle_encoder,
img_backbone=img_backbone,
fusion_layer=fusion_layer,
pts_backbone=pts_backbone,
pts_neck=pts_neck,
img_neck=img_neck,
img_bbox_head=img_bbox_head,
pts_bbox_head=pts_bbox_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained,
)
def extract_feat(self, points, img, img_meta):
if self.with_img_backbone:
img_feats = self.img_backbone(img)
if self.with_img_neck:
img_feats = self.img_neck(img_feats)
voxels, coors = self.voxelize(points)
voxel_features, feature_coors = self.voxel_encoder(
voxels, coors, points, img_feats, img_meta)
batch_size = coors[-1, 0] + 1
x = self.middle_encoder(voxel_features, feature_coors, batch_size)
x = self.pts_backbone(x)
if self.with_pts_neck:
x = self.pts_neck(x)
return x
@DETECTORS.register_module
class DynamicMVXNetV3(DynamicMVXNet):
def __init__(self,
voxel_layer,
voxel_encoder,
middle_encoder,
pts_backbone,
fusion_layer=None,
img_backbone=None,
img_neck=None,
pts_neck=None,
pts_bbox_head=None,
img_bbox_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(DynamicMVXNetV3, self).__init__(
voxel_layer=voxel_layer,
voxel_encoder=voxel_encoder,
middle_encoder=middle_encoder,
img_backbone=img_backbone,
fusion_layer=fusion_layer,
pts_backbone=pts_backbone,
pts_neck=pts_neck,
img_neck=img_neck,
img_bbox_head=img_bbox_head,
pts_bbox_head=pts_bbox_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained,
)
def extract_feat(self, points, img, img_meta):
if self.with_img_backbone:
img_feats = self.img_backbone(img)
if self.with_img_neck:
img_feats = self.img_neck(img_feats)
voxels, coors = self.voxelize(points)
voxel_features, feature_coors = self.voxel_encoder(voxels, coors)
batch_size = coors[-1, 0] + 1
x = self.middle_encoder(voxel_features, feature_coors, batch_size)
x = self.pts_backbone(x)
if self.with_pts_neck:
x = self.pts_neck(x, coors, points, img_feats, img_meta)
return x
import torch
import torch.nn as nn
import torch.nn.functional as F
from mmdet3d.core import (bbox2result_coco, bbox2roi, build_assigner,
build_sampler)
from mmdet3d.ops import Voxelization
from mmdet.models.registry import DETECTORS
from .. import builder
from .base import BaseDetector
from .test_mixins import BBoxTestMixin, RPNTestMixin
@DETECTORS.register_module
class MVXTwoStageDetector(BaseDetector, RPNTestMixin, BBoxTestMixin):
def __init__(self,
pts_voxel_layer=None,
pts_voxel_encoder=None,
pts_middle_encoder=None,
pts_fusion_layer=None,
img_backbone=None,
pts_backbone=None,
img_neck=None,
pts_neck=None,
pts_bbox_head=None,
img_bbox_head=None,
img_shared_head=None,
img_rpn_head=None,
img_bbox_roi_extractor=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(MVXTwoStageDetector, self).__init__()
if pts_voxel_layer:
self.pts_voxel_layer = Voxelization(**pts_voxel_layer)
if pts_voxel_encoder:
self.pts_voxel_encoder = builder.build_voxel_encoder(
pts_voxel_encoder)
if pts_middle_encoder:
self.pts_middle_encoder = builder.build_middle_encoder(
pts_middle_encoder)
if pts_backbone:
self.pts_backbone = builder.build_backbone(pts_backbone)
if pts_fusion_layer:
self.pts_fusion_layer = builder.build_fusion_layer(
pts_fusion_layer)
if pts_neck is not None:
self.pts_neck = builder.build_neck(pts_neck)
if pts_bbox_head:
pts_train_cfg = train_cfg.pts if train_cfg else None
pts_bbox_head.update(train_cfg=pts_train_cfg)
pts_test_cfg = test_cfg.pts if test_cfg else None
pts_bbox_head.update(test_cfg=pts_test_cfg)
self.pts_bbox_head = builder.build_head(pts_bbox_head)
if img_backbone:
self.img_backbone = builder.build_backbone(img_backbone)
if img_neck is not None:
self.img_neck = builder.build_neck(img_neck)
if img_shared_head is not None:
self.img_shared_head = builder.build_shared_head(img_shared_head)
if img_rpn_head is not None:
self.img_rpn_head = builder.build_head(img_rpn_head)
if img_bbox_head is not None:
self.img_bbox_roi_extractor = builder.build_roi_extractor(
img_bbox_roi_extractor)
self.img_bbox_head = builder.build_head(img_bbox_head)
self.train_cfg = train_cfg
self.test_cfg = test_cfg
self.init_weights(pretrained=pretrained)
def init_weights(self, pretrained=None):
super(MVXTwoStageDetector, self).init_weights(pretrained)
if self.with_img_backbone:
self.img_backbone.init_weights(pretrained=pretrained)
if self.with_img_neck:
if isinstance(self.img_neck, nn.Sequential):
for m in self.img_neck:
m.init_weights()
else:
self.img_neck.init_weights()
if self.with_shared_head:
self.img_shared_head.init_weights(pretrained=pretrained)
if self.with_img_rpn:
self.img_rpn_head.init_weights()
if self.with_img_bbox:
self.img_bbox_roi_extractor.init_weights()
self.img_bbox_head.init_weights()
if self.with_pts_bbox:
self.pts_bbox_head.init_weights()
@property
def with_img_shared_head(self):
return hasattr(self,
'img_shared_head') and self.img_shared_head is not None
@property
def with_pts_bbox(self):
return hasattr(self,
'pts_bbox_head') and self.pts_bbox_head is not None
@property
def with_img_bbox(self):
return hasattr(self,
'img_bbox_head') and self.img_bbox_head is not None
@property
def with_img_backbone(self):
return hasattr(self, 'img_backbone') and self.img_backbone is not None
@property
def with_fusion(self):
return hasattr(self,
'pts_fusion_layer') and self.fusion_layer is not None
@property
def with_img_neck(self):
return hasattr(self, 'img_neck') and self.img_neck is not None
@property
def with_pts_neck(self):
return hasattr(self, 'pts_neck') and self.pts_neck is not None
@property
def with_img_rpn(self):
return hasattr(self, 'img_rpn_head') and self.img_rpn_head is not None
def extract_img_feat(self, img, img_meta):
if self.with_img_backbone:
if img.dim() == 5 and img.size(0) == 1:
img.squeeze_()
elif img.dim() == 5 and img.size(0) > 1:
B, N, C, H, W = img.size()
img = img.view(B * N, C, H, W)
img_feats = self.img_backbone(img)
else:
return None
if self.with_img_neck:
img_feats = self.img_neck(img_feats)
if torch.isnan(img_feats[0]).any():
import pdb
pdb.set_trace()
return img_feats
def extract_pts_feat(self, pts, img_feats, img_meta):
if not self.with_pts_bbox:
return None
voxels, num_points, coors = self.voxelize(pts)
voxel_features = self.pts_voxel_encoder(voxels, num_points, coors)
batch_size = coors[-1, 0] + 1
x = self.pts_middle_encoder(voxel_features, coors, batch_size)
x = self.pts_backbone(x)
if self.with_pts_neck:
x = self.pts_neck(x)
return x
def extract_feat(self, points, img, img_meta):
img_feats = self.extract_img_feat(img, img_meta)
pts_feats = self.extract_pts_feat(points, img_feats, img_meta)
return (img_feats, pts_feats)
@torch.no_grad()
def voxelize(self, points):
voxels, coors, num_points = [], [], []
for res in points:
res_voxels, res_coors, res_num_points = self.pts_voxel_layer(res)
voxels.append(res_voxels)
coors.append(res_coors)
num_points.append(res_num_points)
voxels = torch.cat(voxels, dim=0)
num_points = torch.cat(num_points, dim=0)
coors_batch = []
for i, coor in enumerate(coors):
coor_pad = F.pad(coor, (1, 0), mode='constant', value=i)
coors_batch.append(coor_pad)
coors_batch = torch.cat(coors_batch, dim=0)
return voxels, num_points, coors_batch
def forward_train(self,
points=None,
img_meta=None,
gt_bboxes_3d=None,
gt_labels_3d=None,
gt_labels=None,
gt_bboxes=None,
img=None,
proposals=None,
gt_bboxes_ignore=None):
img_feats, pts_feats = self.extract_feat(
points, img=img, img_meta=img_meta)
losses = dict()
if pts_feats:
losses_pts = self.forward_pts_train(pts_feats, gt_bboxes_3d,
gt_labels_3d, img_meta,
gt_bboxes_ignore)
losses.update(losses_pts)
if img_feats:
losses_img = self.forward_img_train(
img_feats,
img_meta=img_meta,
gt_bboxes=gt_bboxes,
gt_labels=gt_labels,
gt_bboxes_ignore=gt_bboxes_ignore,
proposals=proposals,
)
losses.update(losses_img)
return losses
def forward_pts_train(self,
pts_feats,
gt_bboxes_3d,
gt_labels_3d,
img_meta,
gt_bboxes_ignore=None):
outs = self.pts_bbox_head(pts_feats)
loss_inputs = outs + (gt_bboxes_3d, gt_labels_3d, img_meta)
losses = self.pts_bbox_head.loss(
*loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
return losses
def forward_img_train(self,
x,
img_meta,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
proposals=None):
losses = dict()
# RPN forward and loss
if self.with_img_rpn:
rpn_outs = self.img_rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_meta,
self.train_cfg.img_rpn)
rpn_losses = self.img_rpn_head.loss(
*rpn_loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
losses.update(rpn_losses)
proposal_cfg = self.train_cfg.get('img_rpn_proposal',
self.test_cfg.img_rpn)
proposal_inputs = rpn_outs + (img_meta, proposal_cfg)
proposal_list = self.img_rpn_head.get_bboxes(*proposal_inputs)
else:
proposal_list = proposals
# assign gts and sample proposals
if self.with_img_bbox:
bbox_assigner = build_assigner(self.train_cfg.img_rcnn.assigner)
bbox_sampler = build_sampler(
self.train_cfg.img_rcnn.sampler, context=self)
num_imgs = len(img_meta)
if gt_bboxes_ignore is None:
gt_bboxes_ignore = [None for _ in range(num_imgs)]
sampling_results = []
for i in range(num_imgs):
assign_result = bbox_assigner.assign(proposal_list[i],
gt_bboxes[i],
gt_bboxes_ignore[i],
gt_labels[i])
sampling_result = bbox_sampler.sample(
assign_result,
proposal_list[i],
gt_bboxes[i],
gt_labels[i],
feats=[lvl_feat[i][None] for lvl_feat in x])
sampling_results.append(sampling_result)
# bbox head forward and loss
if self.with_img_bbox:
rois = bbox2roi([res.bboxes for res in sampling_results])
# TODO: a more flexible way to decide which feature maps to use
bbox_feats = self.img_bbox_roi_extractor(
x[:self.img_bbox_roi_extractor.num_inputs], rois)
if self.with_shared_head:
bbox_feats = self.img_shared_head(bbox_feats)
cls_score, bbox_pred = self.img_bbox_head(bbox_feats)
bbox_targets = self.img_bbox_head.get_target(
sampling_results, gt_bboxes, gt_labels,
self.train_cfg.img_rcnn)
loss_bbox = self.img_bbox_head.loss(cls_score, bbox_pred,
*bbox_targets)
losses.update(loss_bbox)
return losses
def forward_test(self, **kwargs):
return self.simple_test(**kwargs)
def forward(self, return_loss=True, **kwargs):
if return_loss:
return self.forward_train(**kwargs)
else:
return self.forward_test(**kwargs)
def simple_test_img(self, x, img_meta, proposals=None, rescale=False):
"""Test without augmentation."""
if proposals is None:
proposal_list = self.simple_test_rpn(x, img_meta,
self.test_cfg.img_rpn)
else:
proposal_list = proposals
det_bboxes, det_labels = self.simple_test_bboxes(
x,
img_meta,
proposal_list,
self.test_cfg.img_rcnn,
rescale=rescale)
bbox_results = bbox2result_coco(det_bboxes, det_labels,
self.img_bbox_head.num_classes)
return bbox_results
def simple_test_bboxes(self,
x,
img_meta,
proposals,
rcnn_test_cfg,
rescale=False):
"""Test only det bboxes without augmentation."""
rois = bbox2roi(proposals)
roi_feats = self.img_bbox_roi_extractor(
x[:len(self.img_bbox_roi_extractor.featmap_strides)], rois)
if self.with_img_shared_head:
roi_feats = self.img_shared_head(roi_feats)
cls_score, bbox_pred = self.img_bbox_head(roi_feats)
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
det_bboxes, det_labels = self.img_bbox_head.get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=rescale,
cfg=rcnn_test_cfg)
return det_bboxes, det_labels
def simple_test_rpn(self, x, img_meta, rpn_test_cfg):
rpn_outs = self.img_rpn_head(x)
proposal_inputs = rpn_outs + (img_meta, rpn_test_cfg)
proposal_list = self.img_rpn_head.get_bboxes(*proposal_inputs)
return proposal_list
def simple_test_pts(self, x, img_meta, rescale=False):
outs = self.pts_bbox_head(x)
bbox_inputs = outs + (img_meta, rescale)
bbox_list = self.pts_bbox_head.get_bboxes(*bbox_inputs)
return bbox_list
def simple_test(self,
points,
img_meta,
img=None,
gt_bboxes_3d=None,
rescale=False):
img_feats, pts_feats = self.extract_feat(
points, img=img, img_meta=img_meta)
bbox_list = dict()
if pts_feats and self.with_pts_bbox:
bbox_pts = self.simple_test_pts(
pts_feats, img_meta, rescale=rescale)
bbox_list.update(pts_bbox=bbox_pts)
if img_feats and self.with_img_bbox:
bbox_img = self.simple_test_img(
img_feats, img_meta, rescale=rescale)
bbox_list.update(img_bbox=bbox_img)
return bbox_list
def aug_test(self, points, imgs, img_metas, rescale=False):
raise NotImplementedError
import torch.nn as nn
from mmdet3d.core import bbox2result_coco
from mmdet.models.registry import DETECTORS
from .. import builder
from .base import BaseDetector
@DETECTORS.register_module
class SingleStageDetector(BaseDetector):
"""Base class for single-stage detectors.
Single-stage detectors directly and densely predict bounding boxes on the
output features of the backbone+neck.
"""
def __init__(self,
backbone,
neck=None,
bbox_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(SingleStageDetector, self).__init__()
self.backbone = builder.build_backbone(backbone)
if neck is not None:
self.neck = builder.build_neck(neck)
bbox_head.update(train_cfg=train_cfg)
bbox_head.update(test_cfg=test_cfg)
self.bbox_head = builder.build_head(bbox_head)
self.train_cfg = train_cfg
self.test_cfg = test_cfg
self.init_weights(pretrained=pretrained)
def init_weights(self, pretrained=None):
super(SingleStageDetector, self).init_weights(pretrained)
self.backbone.init_weights(pretrained=pretrained)
if self.with_neck:
if isinstance(self.neck, nn.Sequential):
for m in self.neck:
m.init_weights()
else:
self.neck.init_weights()
self.bbox_head.init_weights()
def extract_feat(self, img):
"""Directly extract features from the backbone+neck
"""
x = self.backbone(img)
if self.with_neck:
x = self.neck(x)
return x
def forward_dummy(self, img):
"""Used for computing network flops.
See `mmedetection/tools/get_flops.py`
"""
x = self.extract_feat(img)
outs = self.bbox_head(x)
return outs
def forward_train(self,
img,
img_metas,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None):
x = self.extract_feat(img)
outs = self.bbox_head(x)
loss_inputs = outs + (gt_bboxes, gt_labels, img_metas, self.train_cfg)
losses = self.bbox_head.loss(
*loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
return losses
def simple_test(self, img, img_meta, rescale=False):
x = self.extract_feat(img)
outs = self.bbox_head(x)
bbox_inputs = outs + (img_meta, self.test_cfg, rescale)
bbox_list = self.bbox_head.get_bboxes(*bbox_inputs)
bbox_results = [
bbox2result_coco(det_bboxes, det_labels,
self.bbox_head.num_classes)
for det_bboxes, det_labels in bbox_list
]
return bbox_results[0]
def aug_test(self, imgs, img_metas, rescale=False):
raise NotImplementedError
import logging
import sys
import torch
from mmdet3d.core import (bbox2roi, bbox_mapping, merge_aug_bboxes,
merge_aug_masks, merge_aug_proposals, multiclass_nms)
logger = logging.getLogger(__name__)
if sys.version_info >= (3, 7):
from mmdet3d.utils.contextmanagers import completed
class RPNTestMixin(object):
if sys.version_info >= (3, 7):
async def async_test_rpn(self, x, img_meta, rpn_test_cfg):
sleep_interval = rpn_test_cfg.pop('async_sleep_interval', 0.025)
async with completed(
__name__, 'rpn_head_forward',
sleep_interval=sleep_interval):
rpn_outs = self.rpn_head(x)
proposal_inputs = rpn_outs + (img_meta, rpn_test_cfg)
proposal_list = self.rpn_head.get_bboxes(*proposal_inputs)
return proposal_list
def simple_test_rpn(self, x, img_meta, rpn_test_cfg):
rpn_outs = self.rpn_head(x)
proposal_inputs = rpn_outs + (img_meta, rpn_test_cfg)
proposal_list = self.rpn_head.get_bboxes(*proposal_inputs)
return proposal_list
def aug_test_rpn(self, feats, img_metas, rpn_test_cfg):
imgs_per_gpu = len(img_metas[0])
aug_proposals = [[] for _ in range(imgs_per_gpu)]
for x, img_meta in zip(feats, img_metas):
proposal_list = self.simple_test_rpn(x, img_meta, rpn_test_cfg)
for i, proposals in enumerate(proposal_list):
aug_proposals[i].append(proposals)
# reorganize the order of 'img_metas' to match the dimensions
# of 'aug_proposals'
aug_img_metas = []
for i in range(imgs_per_gpu):
aug_img_meta = []
for j in range(len(img_metas)):
aug_img_meta.append(img_metas[j][i])
aug_img_metas.append(aug_img_meta)
# after merging, proposals will be rescaled to the original image size
merged_proposals = [
merge_aug_proposals(proposals, aug_img_meta, rpn_test_cfg)
for proposals, aug_img_meta in zip(aug_proposals, aug_img_metas)
]
return merged_proposals
class BBoxTestMixin(object):
if sys.version_info >= (3, 7):
async def async_test_bboxes(self,
x,
img_meta,
proposals,
rcnn_test_cfg,
rescale=False,
bbox_semaphore=None,
global_lock=None):
"""Async test only det bboxes without augmentation."""
rois = bbox2roi(proposals)
roi_feats = self.bbox_roi_extractor(
x[:len(self.bbox_roi_extractor.featmap_strides)], rois)
if self.with_shared_head:
roi_feats = self.shared_head(roi_feats)
sleep_interval = rcnn_test_cfg.get('async_sleep_interval', 0.017)
async with completed(
__name__, 'bbox_head_forward',
sleep_interval=sleep_interval):
cls_score, bbox_pred = self.bbox_head(roi_feats)
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
det_bboxes, det_labels = self.bbox_head.get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=rescale,
cfg=rcnn_test_cfg)
return det_bboxes, det_labels
def simple_test_bboxes(self,
x,
img_meta,
proposals,
rcnn_test_cfg,
rescale=False):
"""Test only det bboxes without augmentation."""
rois = bbox2roi(proposals)
roi_feats = self.bbox_roi_extractor(
x[:len(self.bbox_roi_extractor.featmap_strides)], rois)
if self.with_shared_head:
roi_feats = self.shared_head(roi_feats)
cls_score, bbox_pred = self.bbox_head(roi_feats)
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
det_bboxes, det_labels = self.bbox_head.get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=rescale,
cfg=rcnn_test_cfg)
return det_bboxes, det_labels
def aug_test_bboxes(self, feats, img_metas, proposal_list, rcnn_test_cfg):
aug_bboxes = []
aug_scores = []
for x, img_meta in zip(feats, img_metas):
# only one image in the batch
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
flip = img_meta[0]['flip']
# TODO more flexible
proposals = bbox_mapping(proposal_list[0][:, :4], img_shape,
scale_factor, flip)
rois = bbox2roi([proposals])
# recompute feature maps to save GPU memory
roi_feats = self.bbox_roi_extractor(
x[:len(self.bbox_roi_extractor.featmap_strides)], rois)
if self.with_shared_head:
roi_feats = self.shared_head(roi_feats)
cls_score, bbox_pred = self.bbox_head(roi_feats)
bboxes, scores = self.bbox_head.get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=False,
cfg=None)
aug_bboxes.append(bboxes)
aug_scores.append(scores)
# after merging, bboxes will be rescaled to the original image size
merged_bboxes, merged_scores = merge_aug_bboxes(
aug_bboxes, aug_scores, img_metas, rcnn_test_cfg)
det_bboxes, det_labels = multiclass_nms(merged_bboxes, merged_scores,
rcnn_test_cfg.score_thr,
rcnn_test_cfg.nms,
rcnn_test_cfg.max_per_img)
return det_bboxes, det_labels
class MaskTestMixin(object):
if sys.version_info >= (3, 7):
async def async_test_mask(self,
x,
img_meta,
det_bboxes,
det_labels,
rescale=False,
mask_test_cfg=None):
# image shape of the first image in the batch (only one)
ori_shape = img_meta[0]['ori_shape']
scale_factor = img_meta[0]['scale_factor']
if det_bboxes.shape[0] == 0:
segm_result = [[]
for _ in range(self.mask_head.num_classes - 1)]
else:
_bboxes = (
det_bboxes[:, :4] *
scale_factor if rescale else det_bboxes)
mask_rois = bbox2roi([_bboxes])
mask_feats = self.mask_roi_extractor(
x[:len(self.mask_roi_extractor.featmap_strides)],
mask_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
if mask_test_cfg and mask_test_cfg.get('async_sleep_interval'):
sleep_interval = mask_test_cfg['async_sleep_interval']
else:
sleep_interval = 0.035
async with completed(
__name__,
'mask_head_forward',
sleep_interval=sleep_interval):
mask_pred = self.mask_head(mask_feats)
segm_result = self.mask_head.get_seg_masks(
mask_pred, _bboxes, det_labels, self.test_cfg.rcnn,
ori_shape, scale_factor, rescale)
return segm_result
def simple_test_mask(self,
x,
img_meta,
det_bboxes,
det_labels,
rescale=False):
# image shape of the first image in the batch (only one)
ori_shape = img_meta[0]['ori_shape']
scale_factor = img_meta[0]['scale_factor']
if det_bboxes.shape[0] == 0:
segm_result = [[] for _ in range(self.mask_head.num_classes)]
else:
# if det_bboxes is rescaled to the original image size, we need to
# rescale it back to the testing scale to obtain RoIs.
if rescale and not isinstance(scale_factor, float):
scale_factor = torch.from_numpy(scale_factor).to(
det_bboxes.device)
_bboxes = (
det_bboxes[:, :4] * scale_factor if rescale else det_bboxes)
mask_rois = bbox2roi([_bboxes])
mask_feats = self.mask_roi_extractor(
x[:len(self.mask_roi_extractor.featmap_strides)], mask_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
mask_pred = self.mask_head(mask_feats)
segm_result = self.mask_head.get_seg_masks(mask_pred, _bboxes,
det_labels,
self.test_cfg.rcnn,
ori_shape, scale_factor,
rescale)
return segm_result
def aug_test_mask(self, feats, img_metas, det_bboxes, det_labels):
if det_bboxes.shape[0] == 0:
segm_result = [[] for _ in range(self.mask_head.num_classes)]
else:
aug_masks = []
for x, img_meta in zip(feats, img_metas):
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
flip = img_meta[0]['flip']
_bboxes = bbox_mapping(det_bboxes[:, :4], img_shape,
scale_factor, flip)
mask_rois = bbox2roi([_bboxes])
mask_feats = self.mask_roi_extractor(
x[:len(self.mask_roi_extractor.featmap_strides)],
mask_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
mask_pred = self.mask_head(mask_feats)
# convert to numpy array to save memory
aug_masks.append(mask_pred.sigmoid().cpu().numpy())
merged_masks = merge_aug_masks(aug_masks, img_metas,
self.test_cfg.rcnn)
ori_shape = img_metas[0][0]['ori_shape']
segm_result = self.mask_head.get_seg_masks(
merged_masks,
det_bboxes,
det_labels,
self.test_cfg.rcnn,
ori_shape,
scale_factor=1.0,
rescale=False)
return segm_result
import torch
import torch.nn as nn
from mmdet3d.core import (bbox2result_coco, bbox2roi, build_assigner,
build_sampler)
from mmdet.models.registry import DETECTORS
from .. import builder
from .base import BaseDetector
from .test_mixins import BBoxTestMixin, MaskTestMixin, RPNTestMixin
@DETECTORS.register_module
class TwoStageDetector(BaseDetector, RPNTestMixin, BBoxTestMixin,
MaskTestMixin):
"""Base class for two-stage detectors.
Two-stage detectors typically consisting of a region proposal network and a
task-specific regression head.
"""
def __init__(self,
backbone,
neck=None,
shared_head=None,
rpn_head=None,
bbox_roi_extractor=None,
bbox_head=None,
mask_roi_extractor=None,
mask_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(TwoStageDetector, self).__init__()
self.backbone = builder.build_backbone(backbone)
if neck is not None:
self.neck = builder.build_neck(neck)
if shared_head is not None:
self.shared_head = builder.build_shared_head(shared_head)
if rpn_head is not None:
self.rpn_head = builder.build_head(rpn_head)
if bbox_head is not None:
self.bbox_roi_extractor = builder.build_roi_extractor(
bbox_roi_extractor)
self.bbox_head = builder.build_head(bbox_head)
if mask_head is not None:
if mask_roi_extractor is not None:
self.mask_roi_extractor = builder.build_roi_extractor(
mask_roi_extractor)
self.share_roi_extractor = False
else:
self.share_roi_extractor = True
self.mask_roi_extractor = self.bbox_roi_extractor
self.mask_head = builder.build_head(mask_head)
self.train_cfg = train_cfg
self.test_cfg = test_cfg
self.init_weights(pretrained=pretrained)
@property
def with_rpn(self):
return hasattr(self, 'rpn_head') and self.rpn_head is not None
def init_weights(self, pretrained=None):
super(TwoStageDetector, self).init_weights(pretrained)
self.backbone.init_weights(pretrained=pretrained)
if self.with_neck:
if isinstance(self.neck, nn.Sequential):
for m in self.neck:
m.init_weights()
else:
self.neck.init_weights()
if self.with_shared_head:
self.shared_head.init_weights(pretrained=pretrained)
if self.with_rpn:
self.rpn_head.init_weights()
if self.with_bbox:
self.bbox_roi_extractor.init_weights()
self.bbox_head.init_weights()
if self.with_mask:
self.mask_head.init_weights()
if not self.share_roi_extractor:
self.mask_roi_extractor.init_weights()
def extract_feat(self, img):
"""Directly extract features from the backbone+neck
"""
x = self.backbone(img)
if self.with_neck:
x = self.neck(x)
return x
def forward_dummy(self, img):
"""Used for computing network flops.
See `mmedetection/tools/get_flops.py`
"""
outs = ()
# backbone
x = self.extract_feat(img)
# rpn
if self.with_rpn:
rpn_outs = self.rpn_head(x)
outs = outs + (rpn_outs, )
proposals = torch.randn(1000, 4).cuda()
# bbox head
rois = bbox2roi([proposals])
if self.with_bbox:
bbox_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs], rois)
if self.with_shared_head:
bbox_feats = self.shared_head(bbox_feats)
cls_score, bbox_pred = self.bbox_head(bbox_feats)
outs = outs + (cls_score, bbox_pred)
# mask head
if self.with_mask:
mask_rois = rois[:100]
mask_feats = self.mask_roi_extractor(
x[:self.mask_roi_extractor.num_inputs], mask_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
mask_pred = self.mask_head(mask_feats)
outs = outs + (mask_pred, )
return outs
def forward_train(self,
img,
img_meta,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
gt_masks=None,
proposals=None):
"""
Args:
img (Tensor): of shape (N, C, H, W) encoding input images.
Typically these should be mean centered and std scaled.
img_meta (list[dict]): list of image info dict where each dict has:
'img_shape', 'scale_factor', 'flip', and may also contain
'filename', 'ori_shape', 'pad_shape', and 'img_norm_cfg'.
For details on the values of these keys see
`mmdet/datasets/pipelines/formatting.py:Collect`.
gt_bboxes (list[Tensor]): each item are the truth boxes for each
image in [tl_x, tl_y, br_x, br_y] format.
gt_labels (list[Tensor]): class indices corresponding to each box
gt_bboxes_ignore (None | list[Tensor]): specify which bounding
boxes can be ignored when computing the loss.
gt_masks (None | Tensor) : true segmentation masks for each box
used if the architecture supports a segmentation task.
proposals : override rpn proposals with custom proposals. Use when
`with_rpn` is False.
Returns:
dict[str, Tensor]: a dictionary of loss components
"""
x = self.extract_feat(img)
losses = dict()
# RPN forward and loss
if self.with_rpn:
rpn_outs = self.rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_meta,
self.train_cfg.rpn)
rpn_losses = self.rpn_head.loss(
*rpn_loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
losses.update(rpn_losses)
proposal_cfg = self.train_cfg.get('rpn_proposal',
self.test_cfg.rpn)
proposal_inputs = rpn_outs + (img_meta, proposal_cfg)
proposal_list = self.rpn_head.get_bboxes(*proposal_inputs)
else:
proposal_list = proposals
# assign gts and sample proposals
if self.with_bbox or self.with_mask:
bbox_assigner = build_assigner(self.train_cfg.rcnn.assigner)
bbox_sampler = build_sampler(
self.train_cfg.rcnn.sampler, context=self)
num_imgs = img.size(0)
if gt_bboxes_ignore is None:
gt_bboxes_ignore = [None for _ in range(num_imgs)]
sampling_results = []
for i in range(num_imgs):
assign_result = bbox_assigner.assign(proposal_list[i],
gt_bboxes[i],
gt_bboxes_ignore[i],
gt_labels[i])
sampling_result = bbox_sampler.sample(
assign_result,
proposal_list[i],
gt_bboxes[i],
gt_labels[i],
feats=[lvl_feat[i][None] for lvl_feat in x])
sampling_results.append(sampling_result)
# bbox head forward and loss
if self.with_bbox:
rois = bbox2roi([res.bboxes for res in sampling_results])
# TODO: a more flexible way to decide which feature maps to use
bbox_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs], rois)
if self.with_shared_head:
bbox_feats = self.shared_head(bbox_feats)
cls_score, bbox_pred = self.bbox_head(bbox_feats)
bbox_targets = self.bbox_head.get_target(sampling_results,
gt_bboxes, gt_labels,
self.train_cfg.rcnn)
loss_bbox = self.bbox_head.loss(cls_score, bbox_pred,
*bbox_targets)
losses.update(loss_bbox)
# mask head forward and loss
if self.with_mask:
if not self.share_roi_extractor:
pos_rois = bbox2roi(
[res.pos_bboxes for res in sampling_results])
mask_feats = self.mask_roi_extractor(
x[:self.mask_roi_extractor.num_inputs], pos_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
else:
pos_inds = []
device = bbox_feats.device
for res in sampling_results:
pos_inds.append(
torch.ones(
res.pos_bboxes.shape[0],
device=device,
dtype=torch.uint8))
pos_inds.append(
torch.zeros(
res.neg_bboxes.shape[0],
device=device,
dtype=torch.uint8))
pos_inds = torch.cat(pos_inds)
mask_feats = bbox_feats[pos_inds]
if mask_feats.shape[0] > 0:
mask_pred = self.mask_head(mask_feats)
mask_targets = self.mask_head.get_target(
sampling_results, gt_masks, self.train_cfg.rcnn)
pos_labels = torch.cat(
[res.pos_gt_labels for res in sampling_results])
loss_mask = self.mask_head.loss(mask_pred, mask_targets,
pos_labels)
losses.update(loss_mask)
return losses
def simple_test(self, img, img_meta, proposals=None, rescale=False):
"""Test without augmentation."""
assert self.with_bbox, 'Bbox head must be implemented.'
x = self.extract_feat(img)
if proposals is None:
proposal_list = self.simple_test_rpn(x, img_meta,
self.test_cfg.rpn)
else:
proposal_list = proposals
det_bboxes, det_labels = self.simple_test_bboxes(
x, img_meta, proposal_list, self.test_cfg.rcnn, rescale=rescale)
bbox_results = bbox2result_coco(det_bboxes, det_labels,
self.bbox_head.num_classes)
if not self.with_mask:
return bbox_results
else:
segm_results = self.simple_test_mask(
x, img_meta, det_bboxes, det_labels, rescale=rescale)
return bbox_results, segm_results
def aug_test(self, imgs, img_metas, rescale=False):
"""Test with augmentations.
If rescale is False, then returned bboxes and masks will fit the scale
of imgs[0].
"""
# recompute feats to save memory
proposal_list = self.aug_test_rpn(
self.extract_feats(imgs), img_metas, self.test_cfg.rpn)
det_bboxes, det_labels = self.aug_test_bboxes(
self.extract_feats(imgs), img_metas, proposal_list,
self.test_cfg.rcnn)
if rescale:
_det_bboxes = det_bboxes
else:
_det_bboxes = det_bboxes.clone()
_det_bboxes[:, :4] *= img_metas[0][0]['scale_factor']
bbox_results = bbox2result_coco(_det_bboxes, det_labels,
self.bbox_head.num_classes)
# det_bboxes always keep the original scale
if self.with_mask:
segm_results = self.aug_test_mask(
self.extract_feats(imgs), img_metas, det_bboxes, det_labels)
return bbox_results, segm_results
else:
return bbox_results
import torch
import torch.nn.functional as F
from mmdet3d.ops import Voxelization
from mmdet.models.registry import DETECTORS
from .. import builder
from .single_stage import SingleStageDetector
@DETECTORS.register_module
class VoxelNet(SingleStageDetector):
def __init__(self,
voxel_layer,
voxel_encoder,
middle_encoder,
backbone,
neck=None,
bbox_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(VoxelNet, self).__init__(
backbone=backbone,
neck=neck,
bbox_head=bbox_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained,
)
self.voxel_layer = Voxelization(**voxel_layer)
self.voxel_encoder = builder.build_voxel_encoder(voxel_encoder)
self.middle_encoder = builder.build_middle_encoder(middle_encoder)
def extract_feat(self, points, img_meta):
voxels, num_points, coors = self.voxelize(points)
voxel_features = self.voxel_encoder(voxels, num_points, coors)
batch_size = coors[-1, 0].item() + 1
x = self.middle_encoder(voxel_features, coors, batch_size)
x = self.backbone(x)
if self.with_neck:
x = self.neck(x)
return x
@torch.no_grad()
def voxelize(self, points):
voxels, coors, num_points = [], [], []
for res in points:
res_voxels, res_coors, res_num_points = self.voxel_layer(res)
voxels.append(res_voxels)
coors.append(res_coors)
num_points.append(res_num_points)
voxels = torch.cat(voxels, dim=0)
num_points = torch.cat(num_points, dim=0)
coors_batch = []
for i, coor in enumerate(coors):
coor_pad = F.pad(coor, (1, 0), mode='constant', value=i)
coors_batch.append(coor_pad)
coors_batch = torch.cat(coors_batch, dim=0)
return voxels, num_points, coors_batch
def forward_train(self,
points,
img_meta,
gt_bboxes_3d,
gt_labels_3d,
gt_bboxes_ignore=None):
x = self.extract_feat(points, img_meta)
outs = self.bbox_head(x)
loss_inputs = outs + (gt_bboxes_3d, gt_labels_3d, img_meta)
losses = self.bbox_head.loss(
*loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
return losses
def forward_test(self, **kwargs):
return self.simple_test(**kwargs)
def forward(self, return_loss=True, **kwargs):
if return_loss:
return self.forward_train(**kwargs)
else:
return self.forward_test(**kwargs)
def simple_test(self, points, img_meta, gt_bboxes_3d=None, rescale=False):
x = self.extract_feat(points, img_meta)
outs = self.bbox_head(x)
bbox_inputs = outs + (img_meta, rescale)
bbox_list = self.bbox_head.get_bboxes(*bbox_inputs)
return bbox_list
@DETECTORS.register_module
class DynamicVoxelNet(VoxelNet):
def __init__(self,
voxel_layer,
voxel_encoder,
middle_encoder,
backbone,
neck=None,
bbox_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(DynamicVoxelNet, self).__init__(
voxel_layer=voxel_layer,
voxel_encoder=voxel_encoder,
middle_encoder=middle_encoder,
backbone=backbone,
neck=neck,
bbox_head=bbox_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained,
)
def extract_feat(self, points, img_meta):
voxels, coors = self.voxelize(points)
voxel_features, feature_coors = self.voxel_encoder(voxels, coors)
batch_size = coors[-1, 0].item() + 1
x = self.middle_encoder(voxel_features, feature_coors, batch_size)
x = self.backbone(x)
if self.with_neck:
x = self.neck(x)
return x
@torch.no_grad()
def voxelize(self, points):
coors = []
# dynamic voxelization only provide a coors mapping
for res in points:
res_coors = self.voxel_layer(res)
coors.append(res_coors)
points = torch.cat(points, dim=0)
coors_batch = []
for i, coor in enumerate(coors):
coor_pad = F.pad(coor, (1, 0), mode='constant', value=i)
coors_batch.append(coor_pad)
coors_batch = torch.cat(coors_batch, dim=0)
return points, coors_batch
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment