Commit 57f6da5c authored by bailuo's avatar bailuo
Browse files

readme

parents
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.modules.utils import _pair
from mmdet.core import (auto_fp16, bbox_target, delta2bbox, force_fp32,
multiclass_nms)
from ..builder import build_loss
from ..losses import accuracy
from ..registry import HEADS
@HEADS.register_module
class BBoxHead(nn.Module):
"""Simplest RoI head, with only two fc layers for classification and
regression respectively"""
def __init__(self,
with_avg_pool=False,
with_cls=True,
with_reg=True,
roi_feat_size=7,
in_channels=256,
num_classes=81,
target_means=[0., 0., 0., 0.],
target_stds=[0.1, 0.1, 0.2, 0.2],
reg_class_agnostic=False,
loss_cls=dict(
type='CrossEntropyLoss',
use_sigmoid=False,
loss_weight=1.0),
loss_bbox=dict(
type='SmoothL1Loss', beta=1.0, loss_weight=1.0)):
super(BBoxHead, self).__init__()
assert with_cls or with_reg
self.with_avg_pool = with_avg_pool
self.with_cls = with_cls
self.with_reg = with_reg
self.roi_feat_size = _pair(roi_feat_size)
self.roi_feat_area = self.roi_feat_size[0] * self.roi_feat_size[1]
self.in_channels = in_channels
self.num_classes = num_classes
self.target_means = target_means
self.target_stds = target_stds
self.reg_class_agnostic = reg_class_agnostic
self.fp16_enabled = False
self.loss_cls = build_loss(loss_cls)
self.loss_bbox = build_loss(loss_bbox)
in_channels = self.in_channels
if self.with_avg_pool:
self.avg_pool = nn.AvgPool2d(self.roi_feat_size)
else:
in_channels *= self.roi_feat_area
if self.with_cls:
self.fc_cls = nn.Linear(in_channels, num_classes)
if self.with_reg:
out_dim_reg = 4 if reg_class_agnostic else 4 * num_classes
self.fc_reg = nn.Linear(in_channels, out_dim_reg)
self.debug_imgs = None
def init_weights(self):
if self.with_cls:
nn.init.normal_(self.fc_cls.weight, 0, 0.01)
nn.init.constant_(self.fc_cls.bias, 0)
if self.with_reg:
nn.init.normal_(self.fc_reg.weight, 0, 0.001)
nn.init.constant_(self.fc_reg.bias, 0)
@auto_fp16()
def forward(self, x):
if self.with_avg_pool:
x = self.avg_pool(x)
x = x.view(x.size(0), -1)
cls_score = self.fc_cls(x) if self.with_cls else None
bbox_pred = self.fc_reg(x) if self.with_reg else None
return cls_score, bbox_pred
def get_target(self, sampling_results, gt_bboxes, gt_labels,
rcnn_train_cfg):
pos_proposals = [res.pos_bboxes for res in sampling_results]
neg_proposals = [res.neg_bboxes for res in sampling_results]
pos_gt_bboxes = [res.pos_gt_bboxes for res in sampling_results]
pos_gt_labels = [res.pos_gt_labels for res in sampling_results]
reg_classes = 1 if self.reg_class_agnostic else self.num_classes
cls_reg_targets = bbox_target(
pos_proposals,
neg_proposals,
pos_gt_bboxes,
pos_gt_labels,
rcnn_train_cfg,
reg_classes,
target_means=self.target_means,
target_stds=self.target_stds)
return cls_reg_targets
@force_fp32(apply_to=('cls_score', 'bbox_pred'))
def loss(self,
cls_score,
bbox_pred,
labels,
label_weights,
bbox_targets,
bbox_weights,
reduction_override=None):
losses = dict()
if cls_score is not None:
avg_factor = max(torch.sum(label_weights > 0).float().item(), 1.)
if cls_score.numel() > 0:
losses['loss_cls'] = self.loss_cls(
cls_score,
labels,
label_weights,
avg_factor=avg_factor,
reduction_override=reduction_override)
losses['acc'] = accuracy(cls_score, labels)
if bbox_pred is not None:
pos_inds = labels > 0
if pos_inds.any():
if self.reg_class_agnostic:
pos_bbox_pred = bbox_pred.view(bbox_pred.size(0),
4)[pos_inds]
else:
pos_bbox_pred = bbox_pred.view(bbox_pred.size(0), -1,
4)[pos_inds,
labels[pos_inds]]
losses['loss_bbox'] = self.loss_bbox(
pos_bbox_pred,
bbox_targets[pos_inds],
bbox_weights[pos_inds],
avg_factor=bbox_targets.size(0),
reduction_override=reduction_override)
return losses
@force_fp32(apply_to=('cls_score', 'bbox_pred'))
def get_det_bboxes(self,
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=False,
cfg=None):
if isinstance(cls_score, list):
cls_score = sum(cls_score) / float(len(cls_score))
scores = F.softmax(cls_score, dim=1) if cls_score is not None else None
if bbox_pred is not None:
bboxes = delta2bbox(rois[:, 1:], bbox_pred, self.target_means,
self.target_stds, img_shape)
else:
bboxes = rois[:, 1:].clone()
if img_shape is not None:
bboxes[:, [0, 2]].clamp_(min=0, max=img_shape[1] - 1)
bboxes[:, [1, 3]].clamp_(min=0, max=img_shape[0] - 1)
if rescale:
if isinstance(scale_factor, float):
bboxes /= scale_factor
else:
scale_factor = torch.from_numpy(scale_factor).to(bboxes.device)
bboxes = (bboxes.view(bboxes.size(0), -1, 4) /
scale_factor).view(bboxes.size()[0], -1)
if cfg is None:
return bboxes, scores
else:
det_bboxes, det_labels = multiclass_nms(bboxes, scores,
cfg.score_thr, cfg.nms,
cfg.max_per_img)
return det_bboxes, det_labels
@force_fp32(apply_to=('bbox_preds', ))
def refine_bboxes(self, rois, labels, bbox_preds, pos_is_gts, img_metas):
"""Refine bboxes during training.
Args:
rois (Tensor): Shape (n*bs, 5), where n is image number per GPU,
and bs is the sampled RoIs per image. The first column is
the image id and the next 4 columns are x1, y1, x2, y2.
labels (Tensor): Shape (n*bs, ).
bbox_preds (Tensor): Shape (n*bs, 4) or (n*bs, 4*#class).
pos_is_gts (list[Tensor]): Flags indicating if each positive bbox
is a gt bbox.
img_metas (list[dict]): Meta info of each image.
Returns:
list[Tensor]: Refined bboxes of each image in a mini-batch.
Example:
>>> # xdoctest: +REQUIRES(module:kwarray)
>>> import kwarray
>>> import numpy as np
>>> from mmdet.core.bbox.demodata import random_boxes
>>> self = BBoxHead(reg_class_agnostic=True)
>>> n_roi = 2
>>> n_img = 4
>>> scale = 512
>>> rng = np.random.RandomState(0)
>>> img_metas = [{'img_shape': (scale, scale)}
... for _ in range(n_img)]
>>> # Create rois in the expected format
>>> roi_boxes = random_boxes(n_roi, scale=scale, rng=rng)
>>> img_ids = torch.randint(0, n_img, (n_roi,))
>>> img_ids = img_ids.float()
>>> rois = torch.cat([img_ids[:, None], roi_boxes], dim=1)
>>> # Create other args
>>> labels = torch.randint(0, 2, (n_roi,)).long()
>>> bbox_preds = random_boxes(n_roi, scale=scale, rng=rng)
>>> # For each image, pretend random positive boxes are gts
>>> is_label_pos = (labels.numpy() > 0).astype(np.int)
>>> lbl_per_img = kwarray.group_items(is_label_pos,
... img_ids.numpy())
>>> pos_per_img = [sum(lbl_per_img.get(gid, []))
... for gid in range(n_img)]
>>> pos_is_gts = [
>>> torch.randint(0, 2, (npos,)).byte().sort(
>>> descending=True)[0]
>>> for npos in pos_per_img
>>> ]
>>> bboxes_list = self.refine_bboxes(rois, labels, bbox_preds,
>>> pos_is_gts, img_metas)
>>> print(bboxes_list)
"""
img_ids = rois[:, 0].long().unique(sorted=True)
assert img_ids.numel() <= len(img_metas)
bboxes_list = []
for i in range(len(img_metas)):
inds = torch.nonzero(rois[:, 0] == i).squeeze(dim=1)
num_rois = inds.numel()
bboxes_ = rois[inds, 1:]
label_ = labels[inds]
bbox_pred_ = bbox_preds[inds]
img_meta_ = img_metas[i]
pos_is_gts_ = pos_is_gts[i]
bboxes = self.regress_by_class(bboxes_, label_, bbox_pred_,
img_meta_)
# filter gt bboxes
pos_keep = 1 - pos_is_gts_
keep_inds = pos_is_gts_.new_ones(num_rois)
keep_inds[:len(pos_is_gts_)] = pos_keep
bboxes_list.append(bboxes[keep_inds])
return bboxes_list
@force_fp32(apply_to=('bbox_pred', ))
def regress_by_class(self, rois, label, bbox_pred, img_meta):
"""Regress the bbox for the predicted class. Used in Cascade R-CNN.
Args:
rois (Tensor): shape (n, 4) or (n, 5)
label (Tensor): shape (n, )
bbox_pred (Tensor): shape (n, 4*(#class+1)) or (n, 4)
img_meta (dict): Image meta info.
Returns:
Tensor: Regressed bboxes, the same shape as input rois.
"""
assert rois.size(1) == 4 or rois.size(1) == 5, repr(rois.shape)
if not self.reg_class_agnostic:
label = label * 4
inds = torch.stack((label, label + 1, label + 2, label + 3), 1)
bbox_pred = torch.gather(bbox_pred, 1, inds)
assert bbox_pred.size(1) == 4
if rois.size(1) == 4:
new_rois = delta2bbox(rois, bbox_pred, self.target_means,
self.target_stds, img_meta['img_shape'])
else:
bboxes = delta2bbox(rois[:, 1:], bbox_pred, self.target_means,
self.target_stds, img_meta['img_shape'])
new_rois = torch.cat((rois[:, [0]], bboxes), dim=1)
return new_rois
import torch.nn as nn
from ..registry import HEADS
from ..utils import ConvModule
from .bbox_head import BBoxHead
@HEADS.register_module
class ConvFCBBoxHead(BBoxHead):
r"""More general bbox head, with shared conv and fc layers and two optional
separated branches.
/-> cls convs -> cls fcs -> cls
shared convs -> shared fcs
\-> reg convs -> reg fcs -> reg
""" # noqa: W605
def __init__(self,
num_shared_convs=0,
num_shared_fcs=0,
num_cls_convs=0,
num_cls_fcs=0,
num_reg_convs=0,
num_reg_fcs=0,
conv_out_channels=256,
fc_out_channels=1024,
conv_cfg=None,
norm_cfg=None,
*args,
**kwargs):
super(ConvFCBBoxHead, self).__init__(*args, **kwargs)
assert (num_shared_convs + num_shared_fcs + num_cls_convs +
num_cls_fcs + num_reg_convs + num_reg_fcs > 0)
if num_cls_convs > 0 or num_reg_convs > 0:
assert num_shared_fcs == 0
if not self.with_cls:
assert num_cls_convs == 0 and num_cls_fcs == 0
if not self.with_reg:
assert num_reg_convs == 0 and num_reg_fcs == 0
self.num_shared_convs = num_shared_convs
self.num_shared_fcs = num_shared_fcs
self.num_cls_convs = num_cls_convs
self.num_cls_fcs = num_cls_fcs
self.num_reg_convs = num_reg_convs
self.num_reg_fcs = num_reg_fcs
self.conv_out_channels = conv_out_channels
self.fc_out_channels = fc_out_channels
self.conv_cfg = conv_cfg
self.norm_cfg = norm_cfg
# add shared convs and fcs
self.shared_convs, self.shared_fcs, last_layer_dim = \
self._add_conv_fc_branch(
self.num_shared_convs, self.num_shared_fcs, self.in_channels,
True)
self.shared_out_channels = last_layer_dim
# add cls specific branch
self.cls_convs, self.cls_fcs, self.cls_last_dim = \
self._add_conv_fc_branch(
self.num_cls_convs, self.num_cls_fcs, self.shared_out_channels)
# add reg specific branch
self.reg_convs, self.reg_fcs, self.reg_last_dim = \
self._add_conv_fc_branch(
self.num_reg_convs, self.num_reg_fcs, self.shared_out_channels)
if self.num_shared_fcs == 0 and not self.with_avg_pool:
if self.num_cls_fcs == 0:
self.cls_last_dim *= self.roi_feat_area
if self.num_reg_fcs == 0:
self.reg_last_dim *= self.roi_feat_area
self.relu = nn.ReLU(inplace=True)
# reconstruct fc_cls and fc_reg since input channels are changed
if self.with_cls:
self.fc_cls = nn.Linear(self.cls_last_dim, self.num_classes)
if self.with_reg:
out_dim_reg = (4 if self.reg_class_agnostic else 4 *
self.num_classes)
self.fc_reg = nn.Linear(self.reg_last_dim, out_dim_reg)
def _add_conv_fc_branch(self,
num_branch_convs,
num_branch_fcs,
in_channels,
is_shared=False):
"""Add shared or separable branch
convs -> avg pool (optional) -> fcs
"""
last_layer_dim = in_channels
# add branch specific conv layers
branch_convs = nn.ModuleList()
if num_branch_convs > 0:
for i in range(num_branch_convs):
conv_in_channels = (
last_layer_dim if i == 0 else self.conv_out_channels)
branch_convs.append(
ConvModule(
conv_in_channels,
self.conv_out_channels,
3,
padding=1,
conv_cfg=self.conv_cfg,
norm_cfg=self.norm_cfg))
last_layer_dim = self.conv_out_channels
# add branch specific fc layers
branch_fcs = nn.ModuleList()
if num_branch_fcs > 0:
# for shared branch, only consider self.with_avg_pool
# for separated branches, also consider self.num_shared_fcs
if (is_shared
or self.num_shared_fcs == 0) and not self.with_avg_pool:
last_layer_dim *= self.roi_feat_area
for i in range(num_branch_fcs):
fc_in_channels = (
last_layer_dim if i == 0 else self.fc_out_channels)
branch_fcs.append(
nn.Linear(fc_in_channels, self.fc_out_channels))
last_layer_dim = self.fc_out_channels
return branch_convs, branch_fcs, last_layer_dim
def init_weights(self):
super(ConvFCBBoxHead, self).init_weights()
for module_list in [self.shared_fcs, self.cls_fcs, self.reg_fcs]:
for m in module_list.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
nn.init.constant_(m.bias, 0)
def forward(self, x):
# shared part
if self.num_shared_convs > 0:
for conv in self.shared_convs:
x = conv(x)
if self.num_shared_fcs > 0:
if self.with_avg_pool:
x = self.avg_pool(x)
x = x.flatten(1)
for fc in self.shared_fcs:
x = self.relu(fc(x))
# separate branches
x_cls = x
x_reg = x
for conv in self.cls_convs:
x_cls = conv(x_cls)
if x_cls.dim() > 2:
if self.with_avg_pool:
x_cls = self.avg_pool(x_cls)
x_cls = x_cls.flatten(1)
for fc in self.cls_fcs:
x_cls = self.relu(fc(x_cls))
for conv in self.reg_convs:
x_reg = conv(x_reg)
if x_reg.dim() > 2:
if self.with_avg_pool:
x_reg = self.avg_pool(x_reg)
x_reg = x_reg.flatten(1)
for fc in self.reg_fcs:
x_reg = self.relu(fc(x_reg))
cls_score = self.fc_cls(x_cls) if self.with_cls else None
bbox_pred = self.fc_reg(x_reg) if self.with_reg else None
return cls_score, bbox_pred
@HEADS.register_module
class SharedFCBBoxHead(ConvFCBBoxHead):
def __init__(self, num_fcs=2, fc_out_channels=1024, *args, **kwargs):
assert num_fcs >= 1
super(SharedFCBBoxHead, self).__init__(
num_shared_convs=0,
num_shared_fcs=num_fcs,
num_cls_convs=0,
num_cls_fcs=0,
num_reg_convs=0,
num_reg_fcs=0,
fc_out_channels=fc_out_channels,
*args,
**kwargs)
import torch.nn as nn
from mmcv.cnn.weight_init import normal_init, xavier_init
from ..backbones.resnet import Bottleneck
from ..registry import HEADS
from ..utils import ConvModule
from .bbox_head import BBoxHead
class BasicResBlock(nn.Module):
"""Basic residual block.
This block is a little different from the block in the ResNet backbone.
The kernel size of conv1 is 1 in this block while 3 in ResNet BasicBlock.
Args:
in_channels (int): Channels of the input feature map.
out_channels (int): Channels of the output feature map.
conv_cfg (dict): The config dict for convolution layers.
norm_cfg (dict): The config dict for normalization layers.
"""
def __init__(self,
in_channels,
out_channels,
conv_cfg=None,
norm_cfg=dict(type='BN')):
super(BasicResBlock, self).__init__()
# main path
self.conv1 = ConvModule(
in_channels,
in_channels,
kernel_size=3,
padding=1,
bias=False,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg)
self.conv2 = ConvModule(
in_channels,
out_channels,
kernel_size=1,
bias=False,
activation=None,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg)
# identity path
self.conv_identity = ConvModule(
in_channels,
out_channels,
kernel_size=1,
conv_cfg=conv_cfg,
norm_cfg=norm_cfg,
activation=None)
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
identity = x
x = self.conv1(x)
x = self.conv2(x)
identity = self.conv_identity(identity)
out = x + identity
out = self.relu(out)
return out
@HEADS.register_module
class DoubleConvFCBBoxHead(BBoxHead):
r"""Bbox head used in Double-Head R-CNN
/-> cls
/-> shared convs ->
\-> reg
roi features
/-> cls
\-> shared fc ->
\-> reg
""" # noqa: W605
def __init__(self,
num_convs=0,
num_fcs=0,
conv_out_channels=1024,
fc_out_channels=1024,
conv_cfg=None,
norm_cfg=dict(type='BN'),
**kwargs):
kwargs.setdefault('with_avg_pool', True)
super(DoubleConvFCBBoxHead, self).__init__(**kwargs)
assert self.with_avg_pool
assert num_convs > 0
assert num_fcs > 0
self.num_convs = num_convs
self.num_fcs = num_fcs
self.conv_out_channels = conv_out_channels
self.fc_out_channels = fc_out_channels
self.conv_cfg = conv_cfg
self.norm_cfg = norm_cfg
# increase the channel of input features
self.res_block = BasicResBlock(self.in_channels,
self.conv_out_channels)
# add conv heads
self.conv_branch = self._add_conv_branch()
# add fc heads
self.fc_branch = self._add_fc_branch()
out_dim_reg = 4 if self.reg_class_agnostic else 4 * self.num_classes
self.fc_reg = nn.Linear(self.conv_out_channels, out_dim_reg)
self.fc_cls = nn.Linear(self.fc_out_channels, self.num_classes)
self.relu = nn.ReLU(inplace=True)
def _add_conv_branch(self):
"""Add the fc branch which consists of a sequential of conv layers"""
branch_convs = nn.ModuleList()
for i in range(self.num_convs):
branch_convs.append(
Bottleneck(
inplanes=self.conv_out_channels,
planes=self.conv_out_channels // 4,
conv_cfg=self.conv_cfg,
norm_cfg=self.norm_cfg))
return branch_convs
def _add_fc_branch(self):
"""Add the fc branch which consists of a sequential of fc layers"""
branch_fcs = nn.ModuleList()
for i in range(self.num_fcs):
fc_in_channels = (
self.in_channels *
self.roi_feat_area if i == 0 else self.fc_out_channels)
branch_fcs.append(nn.Linear(fc_in_channels, self.fc_out_channels))
return branch_fcs
def init_weights(self):
normal_init(self.fc_cls, std=0.01)
normal_init(self.fc_reg, std=0.001)
for m in self.fc_branch.modules():
if isinstance(m, nn.Linear):
xavier_init(m, distribution='uniform')
def forward(self, x_cls, x_reg):
# conv head
x_conv = self.res_block(x_reg)
for conv in self.conv_branch:
x_conv = conv(x_conv)
if self.with_avg_pool:
x_conv = self.avg_pool(x_conv)
x_conv = x_conv.view(x_conv.size(0), -1)
bbox_pred = self.fc_reg(x_conv)
# fc head
x_fc = x_cls.view(x_cls.size(0), -1)
for fc in self.fc_branch:
x_fc = self.relu(fc(x_fc))
cls_score = self.fc_cls(x_fc)
return cls_score, bbox_pred
from torch import nn
from mmdet.utils import build_from_cfg
from .registry import (BACKBONES, DETECTORS, HEADS, LOSSES, NECKS,
ROI_EXTRACTORS, SHARED_HEADS)
def build(cfg, registry, default_args=None):
if isinstance(cfg, list):
modules = [
build_from_cfg(cfg_, registry, default_args) for cfg_ in cfg
]
return nn.Sequential(*modules)
else:
return build_from_cfg(cfg, registry, default_args)
def build_backbone(cfg):
return build(cfg, BACKBONES)
def build_neck(cfg):
return build(cfg, NECKS)
def build_roi_extractor(cfg):
return build(cfg, ROI_EXTRACTORS)
def build_shared_head(cfg):
return build(cfg, SHARED_HEADS)
def build_head(cfg):
return build(cfg, HEADS)
def build_loss(cfg):
return build(cfg, LOSSES)
def build_detector(cfg, train_cfg=None, test_cfg=None):
return build(cfg, DETECTORS, dict(train_cfg=train_cfg, test_cfg=test_cfg))
from .atss import ATSS
from .base import BaseDetector
from .cascade_rcnn import CascadeRCNN
from .double_head_rcnn import DoubleHeadRCNN
from .fast_rcnn import FastRCNN
from .faster_rcnn import FasterRCNN
from .fcos import FCOS
from .fovea import FOVEA
from .grid_rcnn import GridRCNN
from .htc import HybridTaskCascade
from .mask_rcnn import MaskRCNN
from .mask_scoring_rcnn import MaskScoringRCNN
from .reppoints_detector import RepPointsDetector
from .retinanet import RetinaNet
from .rpn import RPN
from .single_stage import SingleStageDetector
from .single_stage_ins import SingleStageInsDetector
from .two_stage import TwoStageDetector
from .solo import SOLO
from .solov2 import SOLOv2
__all__ = [
'ATSS', 'BaseDetector', 'SingleStageDetector', 'TwoStageDetector', 'RPN',
'FastRCNN', 'FasterRCNN', 'MaskRCNN', 'CascadeRCNN', 'HybridTaskCascade',
'DoubleHeadRCNN', 'RetinaNet', 'FCOS', 'GridRCNN', 'MaskScoringRCNN',
'RepPointsDetector', 'FOVEA', 'SingleStageInsDetector', 'SOLO', 'SOLOv2'
]
from ..registry import DETECTORS
from .single_stage import SingleStageDetector
@DETECTORS.register_module
class ATSS(SingleStageDetector):
def __init__(self,
backbone,
neck,
bbox_head,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(ATSS, self).__init__(backbone, neck, bbox_head, train_cfg,
test_cfg, pretrained)
from abc import ABCMeta, abstractmethod
import mmcv
import numpy as np
import pycocotools.mask as maskUtils
import torch.nn as nn
from mmdet.core import auto_fp16, get_classes, tensor2imgs
from mmdet.utils import print_log
class BaseDetector(nn.Module, metaclass=ABCMeta):
"""Base class for detectors"""
def __init__(self):
super(BaseDetector, self).__init__()
self.fp16_enabled = False
@property
def with_neck(self):
return hasattr(self, 'neck') and self.neck is not None
@property
def with_mask_feat_head(self):
return hasattr(self, 'mask_feat_head') and \
self.mask_feat_head is not None
@property
def with_shared_head(self):
return hasattr(self, 'shared_head') and self.shared_head is not None
@property
def with_bbox(self):
return hasattr(self, 'bbox_head') and self.bbox_head is not None
@property
def with_mask(self):
return hasattr(self, 'mask_head') and self.mask_head is not None
@abstractmethod
def extract_feat(self, imgs):
pass
def extract_feats(self, imgs):
assert isinstance(imgs, list)
for img in imgs:
yield self.extract_feat(img)
@abstractmethod
def forward_train(self, imgs, img_metas, **kwargs):
"""
Args:
img (list[Tensor]): list of tensors of shape (1, C, H, W).
Typically these should be mean centered and std scaled.
img_metas (list[dict]): list of image info dict where each dict
has:
'img_shape', 'scale_factor', 'flip', and my also contain
'filename', 'ori_shape', 'pad_shape', and 'img_norm_cfg'.
For details on the values of these keys see
`mmdet/datasets/pipelines/formatting.py:Collect`.
**kwargs: specific to concrete implementation
"""
pass
async def async_simple_test(self, img, img_meta, **kwargs):
raise NotImplementedError
@abstractmethod
def simple_test(self, img, img_meta, **kwargs):
pass
@abstractmethod
def aug_test(self, imgs, img_metas, **kwargs):
pass
def init_weights(self, pretrained=None):
if pretrained is not None:
print_log('load model from: {}'.format(pretrained), logger='root')
async def aforward_test(self, *, img, img_meta, **kwargs):
for var, name in [(img, 'img'), (img_meta, 'img_meta')]:
if not isinstance(var, list):
raise TypeError('{} must be a list, but got {}'.format(
name, type(var)))
num_augs = len(img)
if num_augs != len(img_meta):
raise ValueError(
'num of augmentations ({}) != num of image meta ({})'.format(
len(img), len(img_meta)))
# TODO: remove the restriction of imgs_per_gpu == 1 when prepared
imgs_per_gpu = img[0].size(0)
assert imgs_per_gpu == 1
if num_augs == 1:
return await self.async_simple_test(img[0], img_meta[0], **kwargs)
else:
raise NotImplementedError
def forward_test(self, imgs, img_metas, **kwargs):
"""
Args:
imgs (List[Tensor]): the outer list indicates test-time
augmentations and inner Tensor should have a shape NxCxHxW,
which contains all images in the batch.
img_meta (List[List[dict]]): the outer list indicates test-time
augs (multiscale, flip, etc.) and the inner list indicates
images in a batch
"""
for var, name in [(imgs, 'imgs'), (img_metas, 'img_metas')]:
if not isinstance(var, list):
raise TypeError('{} must be a list, but got {}'.format(
name, type(var)))
num_augs = len(imgs)
if num_augs != len(img_metas):
raise ValueError(
'num of augmentations ({}) != num of image meta ({})'.format(
len(imgs), len(img_metas)))
# TODO: remove the restriction of imgs_per_gpu == 1 when prepared
imgs_per_gpu = imgs[0].size(0)
assert imgs_per_gpu == 1
if num_augs == 1:
return self.simple_test(imgs[0], img_metas[0], **kwargs)
else:
return self.aug_test(imgs, img_metas, **kwargs)
@auto_fp16(apply_to=('img', ))
def forward(self, img, img_meta, return_loss=True, **kwargs):
"""
Calls either forward_train or forward_test depending on whether
return_loss=True. Note this setting will change the expected inputs.
When `return_loss=True`, img and img_meta are single-nested (i.e.
Tensor and List[dict]), and when `resturn_loss=False`, img and img_meta
should be double nested (i.e. List[Tensor], List[List[dict]]), with
the outer list indicating test time augmentations.
"""
if return_loss:
return self.forward_train(img, img_meta, **kwargs)
else:
return self.forward_test(img, img_meta, **kwargs)
def show_result(self, data, result, dataset=None, score_thr=0.3):
if isinstance(result, tuple):
bbox_result, segm_result = result
else:
bbox_result, segm_result = result, None
img_tensor = data['img'][0]
img_metas = data['img_meta'][0].data[0]
imgs = tensor2imgs(img_tensor, **img_metas[0]['img_norm_cfg'])
assert len(imgs) == len(img_metas)
if dataset is None:
class_names = self.CLASSES
elif isinstance(dataset, str):
class_names = get_classes(dataset)
elif isinstance(dataset, (list, tuple)):
class_names = dataset
else:
raise TypeError(
'dataset must be a valid dataset name or a sequence'
' of class names, not {}'.format(type(dataset)))
for img, img_meta in zip(imgs, img_metas):
h, w, _ = img_meta['img_shape']
img_show = img[:h, :w, :]
bboxes = np.vstack(bbox_result)
# draw segmentation masks
if segm_result is not None:
segms = mmcv.concat_list(segm_result)
inds = np.where(bboxes[:, -1] > score_thr)[0]
for i in inds:
color_mask = np.random.randint(
0, 256, (1, 3), dtype=np.uint8)
mask = maskUtils.decode(segms[i]).astype(np.bool)
img_show[mask] = img_show[mask] * 0.5 + color_mask * 0.5
# draw bounding boxes
labels = [
np.full(bbox.shape[0], i, dtype=np.int32)
for i, bbox in enumerate(bbox_result)
]
labels = np.concatenate(labels)
mmcv.imshow_det_bboxes(
img_show,
bboxes,
labels,
class_names=class_names,
score_thr=score_thr)
from __future__ import division
import torch
import torch.nn as nn
from mmdet.core import (bbox2result, bbox2roi, bbox_mapping, build_assigner,
build_sampler, merge_aug_bboxes, merge_aug_masks,
multiclass_nms)
from .. import builder
from ..registry import DETECTORS
from .base import BaseDetector
from .test_mixins import RPNTestMixin
@DETECTORS.register_module
class CascadeRCNN(BaseDetector, RPNTestMixin):
def __init__(self,
num_stages,
backbone,
neck=None,
shared_head=None,
rpn_head=None,
bbox_roi_extractor=None,
bbox_head=None,
mask_roi_extractor=None,
mask_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
assert bbox_roi_extractor is not None
assert bbox_head is not None
super(CascadeRCNN, self).__init__()
self.num_stages = num_stages
self.backbone = builder.build_backbone(backbone)
if neck is not None:
self.neck = builder.build_neck(neck)
if rpn_head is not None:
self.rpn_head = builder.build_head(rpn_head)
if shared_head is not None:
self.shared_head = builder.build_shared_head(shared_head)
if bbox_head is not None:
self.bbox_roi_extractor = nn.ModuleList()
self.bbox_head = nn.ModuleList()
if not isinstance(bbox_roi_extractor, list):
bbox_roi_extractor = [
bbox_roi_extractor for _ in range(num_stages)
]
if not isinstance(bbox_head, list):
bbox_head = [bbox_head for _ in range(num_stages)]
assert len(bbox_roi_extractor) == len(bbox_head) == self.num_stages
for roi_extractor, head in zip(bbox_roi_extractor, bbox_head):
self.bbox_roi_extractor.append(
builder.build_roi_extractor(roi_extractor))
self.bbox_head.append(builder.build_head(head))
if mask_head is not None:
self.mask_head = nn.ModuleList()
if not isinstance(mask_head, list):
mask_head = [mask_head for _ in range(num_stages)]
assert len(mask_head) == self.num_stages
for head in mask_head:
self.mask_head.append(builder.build_head(head))
if mask_roi_extractor is not None:
self.share_roi_extractor = False
self.mask_roi_extractor = nn.ModuleList()
if not isinstance(mask_roi_extractor, list):
mask_roi_extractor = [
mask_roi_extractor for _ in range(num_stages)
]
assert len(mask_roi_extractor) == self.num_stages
for roi_extractor in mask_roi_extractor:
self.mask_roi_extractor.append(
builder.build_roi_extractor(roi_extractor))
else:
self.share_roi_extractor = True
self.mask_roi_extractor = self.bbox_roi_extractor
self.train_cfg = train_cfg
self.test_cfg = test_cfg
self.init_weights(pretrained=pretrained)
@property
def with_rpn(self):
return hasattr(self, 'rpn_head') and self.rpn_head is not None
def init_weights(self, pretrained=None):
super(CascadeRCNN, self).init_weights(pretrained)
self.backbone.init_weights(pretrained=pretrained)
if self.with_neck:
if isinstance(self.neck, nn.Sequential):
for m in self.neck:
m.init_weights()
else:
self.neck.init_weights()
if self.with_rpn:
self.rpn_head.init_weights()
if self.with_shared_head:
self.shared_head.init_weights(pretrained=pretrained)
for i in range(self.num_stages):
if self.with_bbox:
self.bbox_roi_extractor[i].init_weights()
self.bbox_head[i].init_weights()
if self.with_mask:
if not self.share_roi_extractor:
self.mask_roi_extractor[i].init_weights()
self.mask_head[i].init_weights()
def extract_feat(self, img):
x = self.backbone(img)
if self.with_neck:
x = self.neck(x)
return x
def forward_dummy(self, img):
outs = ()
# backbone
x = self.extract_feat(img)
# rpn
if self.with_rpn:
rpn_outs = self.rpn_head(x)
outs = outs + (rpn_outs, )
proposals = torch.randn(1000, 4).cuda()
# bbox heads
rois = bbox2roi([proposals])
if self.with_bbox:
for i in range(self.num_stages):
bbox_feats = self.bbox_roi_extractor[i](
x[:self.bbox_roi_extractor[i].num_inputs], rois)
if self.with_shared_head:
bbox_feats = self.shared_head(bbox_feats)
cls_score, bbox_pred = self.bbox_head[i](bbox_feats)
outs = outs + (cls_score, bbox_pred)
# mask heads
if self.with_mask:
mask_rois = rois[:100]
for i in range(self.num_stages):
mask_feats = self.mask_roi_extractor[i](
x[:self.mask_roi_extractor[i].num_inputs], mask_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
mask_pred = self.mask_head[i](mask_feats)
outs = outs + (mask_pred, )
return outs
def forward_train(self,
img,
img_meta,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
gt_masks=None,
proposals=None):
"""
Args:
img (Tensor): of shape (N, C, H, W) encoding input images.
Typically these should be mean centered and std scaled.
img_meta (list[dict]): list of image info dict where each dict has:
'img_shape', 'scale_factor', 'flip', and my also contain
'filename', 'ori_shape', 'pad_shape', and 'img_norm_cfg'.
For details on the values of these keys see
`mmdet/datasets/pipelines/formatting.py:Collect`.
gt_bboxes (list[Tensor]): each item are the truth boxes for each
image in [tl_x, tl_y, br_x, br_y] format.
gt_labels (list[Tensor]): class indices corresponding to each box
gt_bboxes_ignore (None | list[Tensor]): specify which bounding
boxes can be ignored when computing the loss.
gt_masks (None | Tensor) : true segmentation masks for each box
used if the architecture supports a segmentation task.
proposals : override rpn proposals with custom proposals. Use when
`with_rpn` is False.
Returns:
dict[str, Tensor]: a dictionary of loss components
"""
x = self.extract_feat(img)
losses = dict()
if self.with_rpn:
rpn_outs = self.rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_meta,
self.train_cfg.rpn)
rpn_losses = self.rpn_head.loss(
*rpn_loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
losses.update(rpn_losses)
proposal_cfg = self.train_cfg.get('rpn_proposal',
self.test_cfg.rpn)
proposal_inputs = rpn_outs + (img_meta, proposal_cfg)
proposal_list = self.rpn_head.get_bboxes(*proposal_inputs)
else:
proposal_list = proposals
for i in range(self.num_stages):
self.current_stage = i
rcnn_train_cfg = self.train_cfg.rcnn[i]
lw = self.train_cfg.stage_loss_weights[i]
# assign gts and sample proposals
sampling_results = []
if self.with_bbox or self.with_mask:
bbox_assigner = build_assigner(rcnn_train_cfg.assigner)
bbox_sampler = build_sampler(
rcnn_train_cfg.sampler, context=self)
num_imgs = img.size(0)
if gt_bboxes_ignore is None:
gt_bboxes_ignore = [None for _ in range(num_imgs)]
for j in range(num_imgs):
assign_result = bbox_assigner.assign(
proposal_list[j], gt_bboxes[j], gt_bboxes_ignore[j],
gt_labels[j])
sampling_result = bbox_sampler.sample(
assign_result,
proposal_list[j],
gt_bboxes[j],
gt_labels[j],
feats=[lvl_feat[j][None] for lvl_feat in x])
sampling_results.append(sampling_result)
# bbox head forward and loss
bbox_roi_extractor = self.bbox_roi_extractor[i]
bbox_head = self.bbox_head[i]
rois = bbox2roi([res.bboxes for res in sampling_results])
if len(rois) == 0:
# If there are no predicted and/or truth boxes, then we cannot
# compute head / mask losses
continue
bbox_feats = bbox_roi_extractor(x[:bbox_roi_extractor.num_inputs],
rois)
if self.with_shared_head:
bbox_feats = self.shared_head(bbox_feats)
cls_score, bbox_pred = bbox_head(bbox_feats)
bbox_targets = bbox_head.get_target(sampling_results, gt_bboxes,
gt_labels, rcnn_train_cfg)
loss_bbox = bbox_head.loss(cls_score, bbox_pred, *bbox_targets)
for name, value in loss_bbox.items():
losses['s{}.{}'.format(i, name)] = (
value * lw if 'loss' in name else value)
# mask head forward and loss
if self.with_mask:
if not self.share_roi_extractor:
mask_roi_extractor = self.mask_roi_extractor[i]
pos_rois = bbox2roi(
[res.pos_bboxes for res in sampling_results])
mask_feats = mask_roi_extractor(
x[:mask_roi_extractor.num_inputs], pos_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
else:
# reuse positive bbox feats
pos_inds = []
device = bbox_feats.device
for res in sampling_results:
pos_inds.append(
torch.ones(
res.pos_bboxes.shape[0],
device=device,
dtype=torch.uint8))
pos_inds.append(
torch.zeros(
res.neg_bboxes.shape[0],
device=device,
dtype=torch.uint8))
pos_inds = torch.cat(pos_inds)
mask_feats = bbox_feats[pos_inds]
mask_head = self.mask_head[i]
mask_pred = mask_head(mask_feats)
mask_targets = mask_head.get_target(sampling_results, gt_masks,
rcnn_train_cfg)
pos_labels = torch.cat(
[res.pos_gt_labels for res in sampling_results])
loss_mask = mask_head.loss(mask_pred, mask_targets, pos_labels)
for name, value in loss_mask.items():
losses['s{}.{}'.format(i, name)] = (
value * lw if 'loss' in name else value)
# refine bboxes
if i < self.num_stages - 1:
pos_is_gts = [res.pos_is_gt for res in sampling_results]
roi_labels = bbox_targets[0] # bbox_targets is a tuple
with torch.no_grad():
proposal_list = bbox_head.refine_bboxes(
rois, roi_labels, bbox_pred, pos_is_gts, img_meta)
return losses
def simple_test(self, img, img_meta, proposals=None, rescale=False):
"""Run inference on a single image.
Args:
img (Tensor): must be in shape (N, C, H, W)
img_meta (list[dict]): a list with one dictionary element.
See `mmdet/datasets/pipelines/formatting.py:Collect` for
details of meta dicts.
proposals : if specified overrides rpn proposals
rescale (bool): if True returns boxes in original image space
Returns:
dict: results
"""
x = self.extract_feat(img)
proposal_list = self.simple_test_rpn(
x, img_meta, self.test_cfg.rpn) if proposals is None else proposals
img_shape = img_meta[0]['img_shape']
ori_shape = img_meta[0]['ori_shape']
scale_factor = img_meta[0]['scale_factor']
# "ms" in variable names means multi-stage
ms_bbox_result = {}
ms_segm_result = {}
ms_scores = []
rcnn_test_cfg = self.test_cfg.rcnn
rois = bbox2roi(proposal_list)
for i in range(self.num_stages):
bbox_roi_extractor = self.bbox_roi_extractor[i]
bbox_head = self.bbox_head[i]
bbox_feats = bbox_roi_extractor(
x[:len(bbox_roi_extractor.featmap_strides)], rois)
if self.with_shared_head:
bbox_feats = self.shared_head(bbox_feats)
cls_score, bbox_pred = bbox_head(bbox_feats)
ms_scores.append(cls_score)
if i < self.num_stages - 1:
bbox_label = cls_score.argmax(dim=1)
rois = bbox_head.regress_by_class(rois, bbox_label, bbox_pred,
img_meta[0])
cls_score = sum(ms_scores) / self.num_stages
det_bboxes, det_labels = self.bbox_head[-1].get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=rescale,
cfg=rcnn_test_cfg)
bbox_result = bbox2result(det_bboxes, det_labels,
self.bbox_head[-1].num_classes)
ms_bbox_result['ensemble'] = bbox_result
if self.with_mask:
if det_bboxes.shape[0] == 0:
mask_classes = self.mask_head[-1].num_classes - 1
segm_result = [[] for _ in range(mask_classes)]
else:
if isinstance(scale_factor, float): # aspect ratio fixed
_bboxes = (
det_bboxes[:, :4] *
scale_factor if rescale else det_bboxes)
else:
_bboxes = (
det_bboxes[:, :4] *
torch.from_numpy(scale_factor).to(det_bboxes.device)
if rescale else det_bboxes)
mask_rois = bbox2roi([_bboxes])
aug_masks = []
for i in range(self.num_stages):
mask_roi_extractor = self.mask_roi_extractor[i]
mask_feats = mask_roi_extractor(
x[:len(mask_roi_extractor.featmap_strides)], mask_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
mask_pred = self.mask_head[i](mask_feats)
aug_masks.append(mask_pred.sigmoid().cpu().numpy())
merged_masks = merge_aug_masks(aug_masks,
[img_meta] * self.num_stages,
self.test_cfg.rcnn)
segm_result = self.mask_head[-1].get_seg_masks(
merged_masks, _bboxes, det_labels, rcnn_test_cfg,
ori_shape, scale_factor, rescale)
ms_segm_result['ensemble'] = segm_result
if self.with_mask:
results = (ms_bbox_result['ensemble'], ms_segm_result['ensemble'])
else:
results = ms_bbox_result['ensemble']
return results
def aug_test(self, imgs, img_metas, proposals=None, rescale=False):
"""Test with augmentations.
If rescale is False, then returned bboxes and masks will fit the scale
of imgs[0].
"""
# recompute feats to save memory
proposal_list = self.aug_test_rpn(
self.extract_feats(imgs), img_metas, self.test_cfg.rpn)
rcnn_test_cfg = self.test_cfg.rcnn
aug_bboxes = []
aug_scores = []
for x, img_meta in zip(self.extract_feats(imgs), img_metas):
# only one image in the batch
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
flip = img_meta[0]['flip']
proposals = bbox_mapping(proposal_list[0][:, :4], img_shape,
scale_factor, flip)
# "ms" in variable names means multi-stage
ms_scores = []
rois = bbox2roi([proposals])
for i in range(self.num_stages):
bbox_roi_extractor = self.bbox_roi_extractor[i]
bbox_head = self.bbox_head[i]
bbox_feats = bbox_roi_extractor(
x[:len(bbox_roi_extractor.featmap_strides)], rois)
if self.with_shared_head:
bbox_feats = self.shared_head(bbox_feats)
cls_score, bbox_pred = bbox_head(bbox_feats)
ms_scores.append(cls_score)
if i < self.num_stages - 1:
bbox_label = cls_score.argmax(dim=1)
rois = bbox_head.regress_by_class(rois, bbox_label,
bbox_pred, img_meta[0])
cls_score = sum(ms_scores) / float(len(ms_scores))
bboxes, scores = self.bbox_head[-1].get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=False,
cfg=None)
aug_bboxes.append(bboxes)
aug_scores.append(scores)
# after merging, bboxes will be rescaled to the original image size
merged_bboxes, merged_scores = merge_aug_bboxes(
aug_bboxes, aug_scores, img_metas, rcnn_test_cfg)
det_bboxes, det_labels = multiclass_nms(merged_bboxes, merged_scores,
rcnn_test_cfg.score_thr,
rcnn_test_cfg.nms,
rcnn_test_cfg.max_per_img)
bbox_result = bbox2result(det_bboxes, det_labels,
self.bbox_head[-1].num_classes)
if self.with_mask:
if det_bboxes.shape[0] == 0:
segm_result = [[]
for _ in range(self.mask_head[-1].num_classes -
1)]
else:
aug_masks = []
aug_img_metas = []
for x, img_meta in zip(self.extract_feats(imgs), img_metas):
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
flip = img_meta[0]['flip']
_bboxes = bbox_mapping(det_bboxes[:, :4], img_shape,
scale_factor, flip)
mask_rois = bbox2roi([_bboxes])
for i in range(self.num_stages):
mask_feats = self.mask_roi_extractor[i](
x[:len(self.mask_roi_extractor[i].featmap_strides
)], mask_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
mask_pred = self.mask_head[i](mask_feats)
aug_masks.append(mask_pred.sigmoid().cpu().numpy())
aug_img_metas.append(img_meta)
merged_masks = merge_aug_masks(aug_masks, aug_img_metas,
self.test_cfg.rcnn)
ori_shape = img_metas[0][0]['ori_shape']
segm_result = self.mask_head[-1].get_seg_masks(
merged_masks,
det_bboxes,
det_labels,
rcnn_test_cfg,
ori_shape,
scale_factor=1.0,
rescale=False)
return bbox_result, segm_result
else:
return bbox_result
def show_result(self, data, result, **kwargs):
if self.with_mask:
ms_bbox_result, ms_segm_result = result
if isinstance(ms_bbox_result, dict):
result = (ms_bbox_result['ensemble'],
ms_segm_result['ensemble'])
else:
if isinstance(result, dict):
result = result['ensemble']
super(CascadeRCNN, self).show_result(data, result, **kwargs)
import torch
from mmdet.core import bbox2roi, build_assigner, build_sampler
from ..registry import DETECTORS
from .two_stage import TwoStageDetector
@DETECTORS.register_module
class DoubleHeadRCNN(TwoStageDetector):
def __init__(self, reg_roi_scale_factor, **kwargs):
super().__init__(**kwargs)
self.reg_roi_scale_factor = reg_roi_scale_factor
def forward_dummy(self, img):
outs = ()
# backbone
x = self.extract_feat(img)
# rpn
if self.with_rpn:
rpn_outs = self.rpn_head(x)
outs = outs + (rpn_outs, )
proposals = torch.randn(1000, 4).cuda()
# bbox head
rois = bbox2roi([proposals])
bbox_cls_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs], rois)
bbox_reg_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs],
rois,
roi_scale_factor=self.reg_roi_scale_factor)
if self.with_shared_head:
bbox_cls_feats = self.shared_head(bbox_cls_feats)
bbox_reg_feats = self.shared_head(bbox_reg_feats)
cls_score, bbox_pred = self.bbox_head(bbox_cls_feats, bbox_reg_feats)
outs += (cls_score, bbox_pred)
return outs
def forward_train(self,
img,
img_meta,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
gt_masks=None,
proposals=None):
x = self.extract_feat(img)
losses = dict()
# RPN forward and loss
if self.with_rpn:
rpn_outs = self.rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_meta,
self.train_cfg.rpn)
rpn_losses = self.rpn_head.loss(
*rpn_loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
losses.update(rpn_losses)
proposal_cfg = self.train_cfg.get('rpn_proposal',
self.test_cfg.rpn)
proposal_inputs = rpn_outs + (img_meta, proposal_cfg)
proposal_list = self.rpn_head.get_bboxes(*proposal_inputs)
else:
proposal_list = proposals
# assign gts and sample proposals
if self.with_bbox or self.with_mask:
bbox_assigner = build_assigner(self.train_cfg.rcnn.assigner)
bbox_sampler = build_sampler(
self.train_cfg.rcnn.sampler, context=self)
num_imgs = img.size(0)
if gt_bboxes_ignore is None:
gt_bboxes_ignore = [None for _ in range(num_imgs)]
sampling_results = []
for i in range(num_imgs):
assign_result = bbox_assigner.assign(proposal_list[i],
gt_bboxes[i],
gt_bboxes_ignore[i],
gt_labels[i])
sampling_result = bbox_sampler.sample(
assign_result,
proposal_list[i],
gt_bboxes[i],
gt_labels[i],
feats=[lvl_feat[i][None] for lvl_feat in x])
sampling_results.append(sampling_result)
# bbox head forward and loss
if self.with_bbox:
rois = bbox2roi([res.bboxes for res in sampling_results])
# TODO: a more flexible way to decide which feature maps to use
bbox_cls_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs], rois)
bbox_reg_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs],
rois,
roi_scale_factor=self.reg_roi_scale_factor)
if self.with_shared_head:
bbox_cls_feats = self.shared_head(bbox_cls_feats)
bbox_reg_feats = self.shared_head(bbox_reg_feats)
cls_score, bbox_pred = self.bbox_head(bbox_cls_feats,
bbox_reg_feats)
bbox_targets = self.bbox_head.get_target(sampling_results,
gt_bboxes, gt_labels,
self.train_cfg.rcnn)
loss_bbox = self.bbox_head.loss(cls_score, bbox_pred,
*bbox_targets)
losses.update(loss_bbox)
# mask head forward and loss
if self.with_mask:
if not self.share_roi_extractor:
pos_rois = bbox2roi(
[res.pos_bboxes for res in sampling_results])
mask_feats = self.mask_roi_extractor(
x[:self.mask_roi_extractor.num_inputs], pos_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
else:
pos_inds = []
device = bbox_cls_feats.device
for res in sampling_results:
pos_inds.append(
torch.ones(
res.pos_bboxes.shape[0],
device=device,
dtype=torch.uint8))
pos_inds.append(
torch.zeros(
res.neg_bboxes.shape[0],
device=device,
dtype=torch.uint8))
pos_inds = torch.cat(pos_inds)
mask_feats = bbox_cls_feats[pos_inds]
mask_pred = self.mask_head(mask_feats)
mask_targets = self.mask_head.get_target(sampling_results,
gt_masks,
self.train_cfg.rcnn)
pos_labels = torch.cat(
[res.pos_gt_labels for res in sampling_results])
loss_mask = self.mask_head.loss(mask_pred, mask_targets,
pos_labels)
losses.update(loss_mask)
return losses
def simple_test_bboxes(self,
x,
img_meta,
proposals,
rcnn_test_cfg,
rescale=False):
"""Test only det bboxes without augmentation."""
rois = bbox2roi(proposals)
bbox_cls_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs], rois)
bbox_reg_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs],
rois,
roi_scale_factor=self.reg_roi_scale_factor)
if self.with_shared_head:
bbox_cls_feats = self.shared_head(bbox_cls_feats)
bbox_reg_feats = self.shared_head(bbox_reg_feats)
cls_score, bbox_pred = self.bbox_head(bbox_cls_feats, bbox_reg_feats)
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
det_bboxes, det_labels = self.bbox_head.get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=rescale,
cfg=rcnn_test_cfg)
return det_bboxes, det_labels
from ..registry import DETECTORS
from .two_stage import TwoStageDetector
@DETECTORS.register_module
class FastRCNN(TwoStageDetector):
def __init__(self,
backbone,
bbox_roi_extractor,
bbox_head,
train_cfg,
test_cfg,
neck=None,
shared_head=None,
mask_roi_extractor=None,
mask_head=None,
pretrained=None):
super(FastRCNN, self).__init__(
backbone=backbone,
neck=neck,
shared_head=shared_head,
bbox_roi_extractor=bbox_roi_extractor,
bbox_head=bbox_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
mask_roi_extractor=mask_roi_extractor,
mask_head=mask_head,
pretrained=pretrained)
def forward_test(self, imgs, img_metas, proposals, **kwargs):
"""
Args:
imgs (List[Tensor]): the outer list indicates test-time
augmentations and inner Tensor should have a shape NxCxHxW,
which contains all images in the batch.
img_meta (List[List[dict]]): the outer list indicates test-time
augs (multiscale, flip, etc.) and the inner list indicates
images in a batch
proposals (List[List[Tensor | None]]): predefiend proposals for
each test-time augmentation and each item.
"""
for var, name in [(imgs, 'imgs'), (img_metas, 'img_metas')]:
if not isinstance(var, list):
raise TypeError('{} must be a list, but got {}'.format(
name, type(var)))
num_augs = len(imgs)
if num_augs != len(img_metas):
raise ValueError(
'num of augmentations ({}) != num of image meta ({})'.format(
len(imgs), len(img_metas)))
# TODO: remove the restriction of imgs_per_gpu == 1 when prepared
imgs_per_gpu = imgs[0].size(0)
assert imgs_per_gpu == 1
if num_augs == 1:
return self.simple_test(imgs[0], img_metas[0], proposals[0],
**kwargs)
else:
return self.aug_test(imgs, img_metas, proposals, **kwargs)
from ..registry import DETECTORS
from .two_stage import TwoStageDetector
@DETECTORS.register_module
class FasterRCNN(TwoStageDetector):
def __init__(self,
backbone,
rpn_head,
bbox_roi_extractor,
bbox_head,
train_cfg,
test_cfg,
neck=None,
shared_head=None,
pretrained=None):
super(FasterRCNN, self).__init__(
backbone=backbone,
neck=neck,
shared_head=shared_head,
rpn_head=rpn_head,
bbox_roi_extractor=bbox_roi_extractor,
bbox_head=bbox_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained)
from ..registry import DETECTORS
from .single_stage import SingleStageDetector
@DETECTORS.register_module
class FCOS(SingleStageDetector):
def __init__(self,
backbone,
neck,
bbox_head,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(FCOS, self).__init__(backbone, neck, bbox_head, train_cfg,
test_cfg, pretrained)
from ..registry import DETECTORS
from .single_stage import SingleStageDetector
@DETECTORS.register_module
class FOVEA(SingleStageDetector):
def __init__(self,
backbone,
neck,
bbox_head,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(FOVEA, self).__init__(backbone, neck, bbox_head, train_cfg,
test_cfg, pretrained)
import torch
from mmdet.core import bbox2result, bbox2roi, build_assigner, build_sampler
from .. import builder
from ..registry import DETECTORS
from .two_stage import TwoStageDetector
@DETECTORS.register_module
class GridRCNN(TwoStageDetector):
"""Grid R-CNN.
This detector is the implementation of:
- Grid R-CNN (https://arxiv.org/abs/1811.12030)
- Grid R-CNN Plus: Faster and Better (https://arxiv.org/abs/1906.05688)
"""
def __init__(self,
backbone,
rpn_head,
bbox_roi_extractor,
bbox_head,
grid_roi_extractor,
grid_head,
train_cfg,
test_cfg,
neck=None,
shared_head=None,
pretrained=None):
assert grid_head is not None
super(GridRCNN, self).__init__(
backbone=backbone,
neck=neck,
shared_head=shared_head,
rpn_head=rpn_head,
bbox_roi_extractor=bbox_roi_extractor,
bbox_head=bbox_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained)
if grid_roi_extractor is not None:
self.grid_roi_extractor = builder.build_roi_extractor(
grid_roi_extractor)
self.share_roi_extractor = False
else:
self.share_roi_extractor = True
self.grid_roi_extractor = self.bbox_roi_extractor
self.grid_head = builder.build_head(grid_head)
self.init_extra_weights()
def init_extra_weights(self):
self.grid_head.init_weights()
if not self.share_roi_extractor:
self.grid_roi_extractor.init_weights()
def _random_jitter(self, sampling_results, img_metas, amplitude=0.15):
"""Ramdom jitter positive proposals for training."""
for sampling_result, img_meta in zip(sampling_results, img_metas):
bboxes = sampling_result.pos_bboxes
random_offsets = bboxes.new_empty(bboxes.shape[0], 4).uniform_(
-amplitude, amplitude)
# before jittering
cxcy = (bboxes[:, 2:4] + bboxes[:, :2]) / 2
wh = (bboxes[:, 2:4] - bboxes[:, :2]).abs()
# after jittering
new_cxcy = cxcy + wh * random_offsets[:, :2]
new_wh = wh * (1 + random_offsets[:, 2:])
# xywh to xyxy
new_x1y1 = (new_cxcy - new_wh / 2)
new_x2y2 = (new_cxcy + new_wh / 2)
new_bboxes = torch.cat([new_x1y1, new_x2y2], dim=1)
# clip bboxes
max_shape = img_meta['img_shape']
if max_shape is not None:
new_bboxes[:, 0::2].clamp_(min=0, max=max_shape[1] - 1)
new_bboxes[:, 1::2].clamp_(min=0, max=max_shape[0] - 1)
sampling_result.pos_bboxes = new_bboxes
return sampling_results
def forward_dummy(self, img):
outs = ()
# backbone
x = self.extract_feat(img)
# rpn
if self.with_rpn:
rpn_outs = self.rpn_head(x)
outs = outs + (rpn_outs, )
proposals = torch.randn(1000, 4).cuda()
# bbox head
rois = bbox2roi([proposals])
bbox_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs], rois)
if self.with_shared_head:
bbox_feats = self.shared_head(bbox_feats)
cls_score, bbox_pred = self.bbox_head(bbox_feats)
# grid head
grid_rois = rois[:100]
grid_feats = self.grid_roi_extractor(
x[:self.grid_roi_extractor.num_inputs], grid_rois)
if self.with_shared_head:
grid_feats = self.shared_head(grid_feats)
grid_pred = self.grid_head(grid_feats)
return rpn_outs, cls_score, bbox_pred, grid_pred
def forward_train(self,
img,
img_meta,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
gt_masks=None,
proposals=None):
x = self.extract_feat(img)
losses = dict()
# RPN forward and loss
if self.with_rpn:
rpn_outs = self.rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_meta,
self.train_cfg.rpn)
rpn_losses = self.rpn_head.loss(
*rpn_loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
losses.update(rpn_losses)
proposal_cfg = self.train_cfg.get('rpn_proposal',
self.test_cfg.rpn)
proposal_inputs = rpn_outs + (img_meta, proposal_cfg)
proposal_list = self.rpn_head.get_bboxes(*proposal_inputs)
else:
proposal_list = proposals
if self.with_bbox:
# assign gts and sample proposals
bbox_assigner = build_assigner(self.train_cfg.rcnn.assigner)
bbox_sampler = build_sampler(
self.train_cfg.rcnn.sampler, context=self)
num_imgs = img.size(0)
if gt_bboxes_ignore is None:
gt_bboxes_ignore = [None for _ in range(num_imgs)]
sampling_results = []
for i in range(num_imgs):
assign_result = bbox_assigner.assign(proposal_list[i],
gt_bboxes[i],
gt_bboxes_ignore[i],
gt_labels[i])
sampling_result = bbox_sampler.sample(
assign_result,
proposal_list[i],
gt_bboxes[i],
gt_labels[i],
feats=[lvl_feat[i][None] for lvl_feat in x])
sampling_results.append(sampling_result)
# bbox head forward and loss
rois = bbox2roi([res.bboxes for res in sampling_results])
# TODO: a more flexible way to decide which feature maps to use
bbox_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs], rois)
if self.with_shared_head:
bbox_feats = self.shared_head(bbox_feats)
cls_score, bbox_pred = self.bbox_head(bbox_feats)
bbox_targets = self.bbox_head.get_target(sampling_results,
gt_bboxes, gt_labels,
self.train_cfg.rcnn)
loss_bbox = self.bbox_head.loss(cls_score, bbox_pred,
*bbox_targets)
losses.update(loss_bbox)
# Grid head forward and loss
sampling_results = self._random_jitter(sampling_results, img_meta)
pos_rois = bbox2roi([res.pos_bboxes for res in sampling_results])
grid_feats = self.grid_roi_extractor(
x[:self.grid_roi_extractor.num_inputs], pos_rois)
if self.with_shared_head:
grid_feats = self.shared_head(grid_feats)
# Accelerate training
max_sample_num_grid = self.train_cfg.rcnn.get('max_num_grid', 192)
sample_idx = torch.randperm(
grid_feats.shape[0])[:min(grid_feats.
shape[0], max_sample_num_grid)]
grid_feats = grid_feats[sample_idx]
grid_pred = self.grid_head(grid_feats)
grid_targets = self.grid_head.get_target(sampling_results,
self.train_cfg.rcnn)
grid_targets = grid_targets[sample_idx]
loss_grid = self.grid_head.loss(grid_pred, grid_targets)
losses.update(loss_grid)
return losses
def simple_test(self, img, img_meta, proposals=None, rescale=False):
"""Test without augmentation."""
assert self.with_bbox, "Bbox head must be implemented."
x = self.extract_feat(img)
proposal_list = self.simple_test_rpn(
x, img_meta, self.test_cfg.rpn) if proposals is None else proposals
det_bboxes, det_labels = self.simple_test_bboxes(
x, img_meta, proposal_list, self.test_cfg.rcnn, rescale=False)
# pack rois into bboxes
grid_rois = bbox2roi([det_bboxes[:, :4]])
grid_feats = self.grid_roi_extractor(
x[:len(self.grid_roi_extractor.featmap_strides)], grid_rois)
if grid_rois.shape[0] != 0:
self.grid_head.test_mode = True
grid_pred = self.grid_head(grid_feats)
det_bboxes = self.grid_head.get_bboxes(det_bboxes,
grid_pred['fused'],
img_meta)
if rescale:
det_bboxes[:, :4] /= img_meta[0]['scale_factor']
else:
det_bboxes = torch.Tensor([])
bbox_results = bbox2result(det_bboxes, det_labels,
self.bbox_head.num_classes)
return bbox_results
import torch
import torch.nn.functional as F
from mmdet.core import (bbox2result, bbox2roi, bbox_mapping, build_assigner,
build_sampler, merge_aug_bboxes, merge_aug_masks,
multiclass_nms)
from .. import builder
from ..registry import DETECTORS
from .cascade_rcnn import CascadeRCNN
@DETECTORS.register_module
class HybridTaskCascade(CascadeRCNN):
def __init__(self,
num_stages,
backbone,
semantic_roi_extractor=None,
semantic_head=None,
semantic_fusion=('bbox', 'mask'),
interleaved=True,
mask_info_flow=True,
**kwargs):
super(HybridTaskCascade, self).__init__(num_stages, backbone, **kwargs)
assert self.with_bbox and self.with_mask
assert not self.with_shared_head # shared head not supported
if semantic_head is not None:
self.semantic_roi_extractor = builder.build_roi_extractor(
semantic_roi_extractor)
self.semantic_head = builder.build_head(semantic_head)
self.semantic_fusion = semantic_fusion
self.interleaved = interleaved
self.mask_info_flow = mask_info_flow
@property
def with_semantic(self):
if hasattr(self, 'semantic_head') and self.semantic_head is not None:
return True
else:
return False
def _bbox_forward_train(self,
stage,
x,
sampling_results,
gt_bboxes,
gt_labels,
rcnn_train_cfg,
semantic_feat=None):
rois = bbox2roi([res.bboxes for res in sampling_results])
bbox_roi_extractor = self.bbox_roi_extractor[stage]
bbox_head = self.bbox_head[stage]
bbox_feats = bbox_roi_extractor(x[:bbox_roi_extractor.num_inputs],
rois)
# semantic feature fusion
# element-wise sum for original features and pooled semantic features
if self.with_semantic and 'bbox' in self.semantic_fusion:
bbox_semantic_feat = self.semantic_roi_extractor([semantic_feat],
rois)
if bbox_semantic_feat.shape[-2:] != bbox_feats.shape[-2:]:
bbox_semantic_feat = F.adaptive_avg_pool2d(
bbox_semantic_feat, bbox_feats.shape[-2:])
bbox_feats += bbox_semantic_feat
cls_score, bbox_pred = bbox_head(bbox_feats)
bbox_targets = bbox_head.get_target(sampling_results, gt_bboxes,
gt_labels, rcnn_train_cfg)
loss_bbox = bbox_head.loss(cls_score, bbox_pred, *bbox_targets)
return loss_bbox, rois, bbox_targets, bbox_pred
def _mask_forward_train(self,
stage,
x,
sampling_results,
gt_masks,
rcnn_train_cfg,
semantic_feat=None):
mask_roi_extractor = self.mask_roi_extractor[stage]
mask_head = self.mask_head[stage]
pos_rois = bbox2roi([res.pos_bboxes for res in sampling_results])
mask_feats = mask_roi_extractor(x[:mask_roi_extractor.num_inputs],
pos_rois)
# semantic feature fusion
# element-wise sum for original features and pooled semantic features
if self.with_semantic and 'mask' in self.semantic_fusion:
mask_semantic_feat = self.semantic_roi_extractor([semantic_feat],
pos_rois)
if mask_semantic_feat.shape[-2:] != mask_feats.shape[-2:]:
mask_semantic_feat = F.adaptive_avg_pool2d(
mask_semantic_feat, mask_feats.shape[-2:])
mask_feats += mask_semantic_feat
# mask information flow
# forward all previous mask heads to obtain last_feat, and fuse it
# with the normal mask feature
if self.mask_info_flow:
last_feat = None
for i in range(stage):
last_feat = self.mask_head[i](
mask_feats, last_feat, return_logits=False)
mask_pred = mask_head(mask_feats, last_feat, return_feat=False)
else:
mask_pred = mask_head(mask_feats)
mask_targets = mask_head.get_target(sampling_results, gt_masks,
rcnn_train_cfg)
pos_labels = torch.cat([res.pos_gt_labels for res in sampling_results])
loss_mask = mask_head.loss(mask_pred, mask_targets, pos_labels)
return loss_mask
def _bbox_forward_test(self, stage, x, rois, semantic_feat=None):
bbox_roi_extractor = self.bbox_roi_extractor[stage]
bbox_head = self.bbox_head[stage]
bbox_feats = bbox_roi_extractor(
x[:len(bbox_roi_extractor.featmap_strides)], rois)
if self.with_semantic and 'bbox' in self.semantic_fusion:
bbox_semantic_feat = self.semantic_roi_extractor([semantic_feat],
rois)
if bbox_semantic_feat.shape[-2:] != bbox_feats.shape[-2:]:
bbox_semantic_feat = F.adaptive_avg_pool2d(
bbox_semantic_feat, bbox_feats.shape[-2:])
bbox_feats += bbox_semantic_feat
cls_score, bbox_pred = bbox_head(bbox_feats)
return cls_score, bbox_pred
def _mask_forward_test(self, stage, x, bboxes, semantic_feat=None):
mask_roi_extractor = self.mask_roi_extractor[stage]
mask_head = self.mask_head[stage]
mask_rois = bbox2roi([bboxes])
mask_feats = mask_roi_extractor(
x[:len(mask_roi_extractor.featmap_strides)], mask_rois)
if self.with_semantic and 'mask' in self.semantic_fusion:
mask_semantic_feat = self.semantic_roi_extractor([semantic_feat],
mask_rois)
if mask_semantic_feat.shape[-2:] != mask_feats.shape[-2:]:
mask_semantic_feat = F.adaptive_avg_pool2d(
mask_semantic_feat, mask_feats.shape[-2:])
mask_feats += mask_semantic_feat
if self.mask_info_flow:
last_feat = None
last_pred = None
for i in range(stage):
mask_pred, last_feat = self.mask_head[i](mask_feats, last_feat)
if last_pred is not None:
mask_pred = mask_pred + last_pred
last_pred = mask_pred
mask_pred = mask_head(mask_feats, last_feat, return_feat=False)
if last_pred is not None:
mask_pred = mask_pred + last_pred
else:
mask_pred = mask_head(mask_feats)
return mask_pred
def forward_dummy(self, img):
outs = ()
# backbone
x = self.extract_feat(img)
# rpn
if self.with_rpn:
rpn_outs = self.rpn_head(x)
outs = outs + (rpn_outs, )
proposals = torch.randn(1000, 4).cuda()
# semantic head
if self.with_semantic:
_, semantic_feat = self.semantic_head(x)
else:
semantic_feat = None
# bbox heads
rois = bbox2roi([proposals])
for i in range(self.num_stages):
cls_score, bbox_pred = self._bbox_forward_test(
i, x, rois, semantic_feat=semantic_feat)
outs = outs + (cls_score, bbox_pred)
# mask heads
if self.with_mask:
mask_rois = rois[:100]
mask_roi_extractor = self.mask_roi_extractor[-1]
mask_feats = mask_roi_extractor(
x[:len(mask_roi_extractor.featmap_strides)], mask_rois)
if self.with_semantic and 'mask' in self.semantic_fusion:
mask_semantic_feat = self.semantic_roi_extractor(
[semantic_feat], mask_rois)
mask_feats += mask_semantic_feat
last_feat = None
for i in range(self.num_stages):
mask_head = self.mask_head[i]
if self.mask_info_flow:
mask_pred, last_feat = mask_head(mask_feats, last_feat)
else:
mask_pred = mask_head(mask_feats)
outs = outs + (mask_pred, )
return outs
def forward_train(self,
img,
img_meta,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
gt_masks=None,
gt_semantic_seg=None,
proposals=None):
x = self.extract_feat(img)
losses = dict()
# RPN part, the same as normal two-stage detectors
if self.with_rpn:
rpn_outs = self.rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_meta,
self.train_cfg.rpn)
rpn_losses = self.rpn_head.loss(
*rpn_loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
losses.update(rpn_losses)
proposal_cfg = self.train_cfg.get('rpn_proposal',
self.test_cfg.rpn)
proposal_inputs = rpn_outs + (img_meta, proposal_cfg)
proposal_list = self.rpn_head.get_bboxes(*proposal_inputs)
else:
proposal_list = proposals
# semantic segmentation part
# 2 outputs: segmentation prediction and embedded features
if self.with_semantic:
semantic_pred, semantic_feat = self.semantic_head(x)
loss_seg = self.semantic_head.loss(semantic_pred, gt_semantic_seg)
losses['loss_semantic_seg'] = loss_seg
else:
semantic_feat = None
for i in range(self.num_stages):
self.current_stage = i
rcnn_train_cfg = self.train_cfg.rcnn[i]
lw = self.train_cfg.stage_loss_weights[i]
# assign gts and sample proposals
sampling_results = []
bbox_assigner = build_assigner(rcnn_train_cfg.assigner)
bbox_sampler = build_sampler(rcnn_train_cfg.sampler, context=self)
num_imgs = img.size(0)
if gt_bboxes_ignore is None:
gt_bboxes_ignore = [None for _ in range(num_imgs)]
for j in range(num_imgs):
assign_result = bbox_assigner.assign(proposal_list[j],
gt_bboxes[j],
gt_bboxes_ignore[j],
gt_labels[j])
sampling_result = bbox_sampler.sample(
assign_result,
proposal_list[j],
gt_bboxes[j],
gt_labels[j],
feats=[lvl_feat[j][None] for lvl_feat in x])
sampling_results.append(sampling_result)
# bbox head forward and loss
loss_bbox, rois, bbox_targets, bbox_pred = \
self._bbox_forward_train(
i, x, sampling_results, gt_bboxes, gt_labels,
rcnn_train_cfg, semantic_feat)
roi_labels = bbox_targets[0]
for name, value in loss_bbox.items():
losses['s{}.{}'.format(i, name)] = (
value * lw if 'loss' in name else value)
# mask head forward and loss
if self.with_mask:
# interleaved execution: use regressed bboxes by the box branch
# to train the mask branch
if self.interleaved:
pos_is_gts = [res.pos_is_gt for res in sampling_results]
with torch.no_grad():
proposal_list = self.bbox_head[i].refine_bboxes(
rois, roi_labels, bbox_pred, pos_is_gts, img_meta)
# re-assign and sample 512 RoIs from 512 RoIs
sampling_results = []
for j in range(num_imgs):
assign_result = bbox_assigner.assign(
proposal_list[j], gt_bboxes[j],
gt_bboxes_ignore[j], gt_labels[j])
sampling_result = bbox_sampler.sample(
assign_result,
proposal_list[j],
gt_bboxes[j],
gt_labels[j],
feats=[lvl_feat[j][None] for lvl_feat in x])
sampling_results.append(sampling_result)
loss_mask = self._mask_forward_train(i, x, sampling_results,
gt_masks, rcnn_train_cfg,
semantic_feat)
for name, value in loss_mask.items():
losses['s{}.{}'.format(i, name)] = (
value * lw if 'loss' in name else value)
# refine bboxes (same as Cascade R-CNN)
if i < self.num_stages - 1 and not self.interleaved:
pos_is_gts = [res.pos_is_gt for res in sampling_results]
with torch.no_grad():
proposal_list = self.bbox_head[i].refine_bboxes(
rois, roi_labels, bbox_pred, pos_is_gts, img_meta)
return losses
def simple_test(self, img, img_meta, proposals=None, rescale=False):
x = self.extract_feat(img)
proposal_list = self.simple_test_rpn(
x, img_meta, self.test_cfg.rpn) if proposals is None else proposals
if self.with_semantic:
_, semantic_feat = self.semantic_head(x)
else:
semantic_feat = None
img_shape = img_meta[0]['img_shape']
ori_shape = img_meta[0]['ori_shape']
scale_factor = img_meta[0]['scale_factor']
# "ms" in variable names means multi-stage
ms_bbox_result = {}
ms_segm_result = {}
ms_scores = []
rcnn_test_cfg = self.test_cfg.rcnn
rois = bbox2roi(proposal_list)
for i in range(self.num_stages):
bbox_head = self.bbox_head[i]
cls_score, bbox_pred = self._bbox_forward_test(
i, x, rois, semantic_feat=semantic_feat)
ms_scores.append(cls_score)
if i < self.num_stages - 1:
bbox_label = cls_score.argmax(dim=1)
rois = bbox_head.regress_by_class(rois, bbox_label, bbox_pred,
img_meta[0])
cls_score = sum(ms_scores) / float(len(ms_scores))
det_bboxes, det_labels = self.bbox_head[-1].get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=rescale,
cfg=rcnn_test_cfg)
bbox_result = bbox2result(det_bboxes, det_labels,
self.bbox_head[-1].num_classes)
ms_bbox_result['ensemble'] = bbox_result
if self.with_mask:
if det_bboxes.shape[0] == 0:
mask_classes = self.mask_head[-1].num_classes - 1
segm_result = [[] for _ in range(mask_classes)]
else:
_bboxes = (
det_bboxes[:, :4] *
scale_factor if rescale else det_bboxes)
mask_rois = bbox2roi([_bboxes])
aug_masks = []
mask_roi_extractor = self.mask_roi_extractor[-1]
mask_feats = mask_roi_extractor(
x[:len(mask_roi_extractor.featmap_strides)], mask_rois)
if self.with_semantic and 'mask' in self.semantic_fusion:
mask_semantic_feat = self.semantic_roi_extractor(
[semantic_feat], mask_rois)
mask_feats += mask_semantic_feat
last_feat = None
for i in range(self.num_stages):
mask_head = self.mask_head[i]
if self.mask_info_flow:
mask_pred, last_feat = mask_head(mask_feats, last_feat)
else:
mask_pred = mask_head(mask_feats)
aug_masks.append(mask_pred.sigmoid().cpu().numpy())
merged_masks = merge_aug_masks(aug_masks,
[img_meta] * self.num_stages,
self.test_cfg.rcnn)
segm_result = self.mask_head[-1].get_seg_masks(
merged_masks, _bboxes, det_labels, rcnn_test_cfg,
ori_shape, scale_factor, rescale)
ms_segm_result['ensemble'] = segm_result
if self.with_mask:
results = (ms_bbox_result['ensemble'], ms_segm_result['ensemble'])
else:
results = ms_bbox_result['ensemble']
return results
def aug_test(self, imgs, img_metas, proposals=None, rescale=False):
"""Test with augmentations.
If rescale is False, then returned bboxes and masks will fit the scale
of imgs[0].
"""
if self.with_semantic:
semantic_feats = [
self.semantic_head(feat)[1]
for feat in self.extract_feats(imgs)
]
else:
semantic_feats = [None] * len(img_metas)
# recompute feats to save memory
proposal_list = self.aug_test_rpn(
self.extract_feats(imgs), img_metas, self.test_cfg.rpn)
rcnn_test_cfg = self.test_cfg.rcnn
aug_bboxes = []
aug_scores = []
for x, img_meta, semantic in zip(
self.extract_feats(imgs), img_metas, semantic_feats):
# only one image in the batch
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
flip = img_meta[0]['flip']
proposals = bbox_mapping(proposal_list[0][:, :4], img_shape,
scale_factor, flip)
# "ms" in variable names means multi-stage
ms_scores = []
rois = bbox2roi([proposals])
for i in range(self.num_stages):
bbox_head = self.bbox_head[i]
cls_score, bbox_pred = self._bbox_forward_test(
i, x, rois, semantic_feat=semantic)
ms_scores.append(cls_score)
if i < self.num_stages - 1:
bbox_label = cls_score.argmax(dim=1)
rois = bbox_head.regress_by_class(rois, bbox_label,
bbox_pred, img_meta[0])
cls_score = sum(ms_scores) / float(len(ms_scores))
bboxes, scores = self.bbox_head[-1].get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=False,
cfg=None)
aug_bboxes.append(bboxes)
aug_scores.append(scores)
# after merging, bboxes will be rescaled to the original image size
merged_bboxes, merged_scores = merge_aug_bboxes(
aug_bboxes, aug_scores, img_metas, rcnn_test_cfg)
det_bboxes, det_labels = multiclass_nms(merged_bboxes, merged_scores,
rcnn_test_cfg.score_thr,
rcnn_test_cfg.nms,
rcnn_test_cfg.max_per_img)
bbox_result = bbox2result(det_bboxes, det_labels,
self.bbox_head[-1].num_classes)
if self.with_mask:
if det_bboxes.shape[0] == 0:
segm_result = [[]
for _ in range(self.mask_head[-1].num_classes -
1)]
else:
aug_masks = []
aug_img_metas = []
for x, img_meta, semantic in zip(
self.extract_feats(imgs), img_metas, semantic_feats):
img_shape = img_meta[0]['img_shape']
scale_factor = img_meta[0]['scale_factor']
flip = img_meta[0]['flip']
_bboxes = bbox_mapping(det_bboxes[:, :4], img_shape,
scale_factor, flip)
mask_rois = bbox2roi([_bboxes])
mask_feats = self.mask_roi_extractor[-1](
x[:len(self.mask_roi_extractor[-1].featmap_strides)],
mask_rois)
if self.with_semantic:
semantic_feat = semantic
mask_semantic_feat = self.semantic_roi_extractor(
[semantic_feat], mask_rois)
if mask_semantic_feat.shape[-2:] != mask_feats.shape[
-2:]:
mask_semantic_feat = F.adaptive_avg_pool2d(
mask_semantic_feat, mask_feats.shape[-2:])
mask_feats += mask_semantic_feat
last_feat = None
for i in range(self.num_stages):
mask_head = self.mask_head[i]
if self.mask_info_flow:
mask_pred, last_feat = mask_head(
mask_feats, last_feat)
else:
mask_pred = mask_head(mask_feats)
aug_masks.append(mask_pred.sigmoid().cpu().numpy())
aug_img_metas.append(img_meta)
merged_masks = merge_aug_masks(aug_masks, aug_img_metas,
self.test_cfg.rcnn)
ori_shape = img_metas[0][0]['ori_shape']
segm_result = self.mask_head[-1].get_seg_masks(
merged_masks,
det_bboxes,
det_labels,
rcnn_test_cfg,
ori_shape,
scale_factor=1.0,
rescale=False)
return bbox_result, segm_result
else:
return bbox_result
from ..registry import DETECTORS
from .two_stage import TwoStageDetector
@DETECTORS.register_module
class MaskRCNN(TwoStageDetector):
def __init__(self,
backbone,
rpn_head,
bbox_roi_extractor,
bbox_head,
mask_roi_extractor,
mask_head,
train_cfg,
test_cfg,
neck=None,
shared_head=None,
pretrained=None):
super(MaskRCNN, self).__init__(
backbone=backbone,
neck=neck,
shared_head=shared_head,
rpn_head=rpn_head,
bbox_roi_extractor=bbox_roi_extractor,
bbox_head=bbox_head,
mask_roi_extractor=mask_roi_extractor,
mask_head=mask_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained)
import torch
from mmdet.core import bbox2roi, build_assigner, build_sampler
from .. import builder
from ..registry import DETECTORS
from .two_stage import TwoStageDetector
@DETECTORS.register_module
class MaskScoringRCNN(TwoStageDetector):
"""Mask Scoring RCNN.
https://arxiv.org/abs/1903.00241
"""
def __init__(self,
backbone,
rpn_head,
bbox_roi_extractor,
bbox_head,
mask_roi_extractor,
mask_head,
train_cfg,
test_cfg,
neck=None,
shared_head=None,
mask_iou_head=None,
pretrained=None):
super(MaskScoringRCNN, self).__init__(
backbone=backbone,
neck=neck,
shared_head=shared_head,
rpn_head=rpn_head,
bbox_roi_extractor=bbox_roi_extractor,
bbox_head=bbox_head,
mask_roi_extractor=mask_roi_extractor,
mask_head=mask_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained)
self.mask_iou_head = builder.build_head(mask_iou_head)
self.mask_iou_head.init_weights()
def forward_dummy(self, img):
raise NotImplementedError
# TODO: refactor forward_train in two stage to reduce code redundancy
def forward_train(self,
img,
img_meta,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
gt_masks=None,
proposals=None):
x = self.extract_feat(img)
losses = dict()
# RPN forward and loss
if self.with_rpn:
rpn_outs = self.rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_meta,
self.train_cfg.rpn)
rpn_losses = self.rpn_head.loss(
*rpn_loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
losses.update(rpn_losses)
proposal_cfg = self.train_cfg.get('rpn_proposal',
self.test_cfg.rpn)
proposal_inputs = rpn_outs + (img_meta, proposal_cfg)
proposal_list = self.rpn_head.get_bboxes(*proposal_inputs)
else:
proposal_list = proposals
# assign gts and sample proposals
if self.with_bbox or self.with_mask:
bbox_assigner = build_assigner(self.train_cfg.rcnn.assigner)
bbox_sampler = build_sampler(
self.train_cfg.rcnn.sampler, context=self)
num_imgs = img.size(0)
if gt_bboxes_ignore is None:
gt_bboxes_ignore = [None for _ in range(num_imgs)]
sampling_results = []
for i in range(num_imgs):
assign_result = bbox_assigner.assign(proposal_list[i],
gt_bboxes[i],
gt_bboxes_ignore[i],
gt_labels[i])
sampling_result = bbox_sampler.sample(
assign_result,
proposal_list[i],
gt_bboxes[i],
gt_labels[i],
feats=[lvl_feat[i][None] for lvl_feat in x])
sampling_results.append(sampling_result)
# bbox head forward and loss
if self.with_bbox:
rois = bbox2roi([res.bboxes for res in sampling_results])
# TODO: a more flexible way to decide which feature maps to use
bbox_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs], rois)
if self.with_shared_head:
bbox_feats = self.shared_head(bbox_feats)
cls_score, bbox_pred = self.bbox_head(bbox_feats)
bbox_targets = self.bbox_head.get_target(sampling_results,
gt_bboxes, gt_labels,
self.train_cfg.rcnn)
loss_bbox = self.bbox_head.loss(cls_score, bbox_pred,
*bbox_targets)
losses.update(loss_bbox)
# mask head forward and loss
if self.with_mask:
if not self.share_roi_extractor:
pos_rois = bbox2roi(
[res.pos_bboxes for res in sampling_results])
mask_feats = self.mask_roi_extractor(
x[:self.mask_roi_extractor.num_inputs], pos_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
else:
pos_inds = []
device = bbox_feats.device
for res in sampling_results:
pos_inds.append(
torch.ones(
res.pos_bboxes.shape[0],
device=device,
dtype=torch.uint8))
pos_inds.append(
torch.zeros(
res.neg_bboxes.shape[0],
device=device,
dtype=torch.uint8))
pos_inds = torch.cat(pos_inds)
mask_feats = bbox_feats[pos_inds]
mask_pred = self.mask_head(mask_feats)
mask_targets = self.mask_head.get_target(sampling_results,
gt_masks,
self.train_cfg.rcnn)
pos_labels = torch.cat(
[res.pos_gt_labels for res in sampling_results])
loss_mask = self.mask_head.loss(mask_pred, mask_targets,
pos_labels)
losses.update(loss_mask)
# mask iou head forward and loss
pos_mask_pred = mask_pred[range(mask_pred.size(0)), pos_labels]
mask_iou_pred = self.mask_iou_head(mask_feats, pos_mask_pred)
pos_mask_iou_pred = mask_iou_pred[range(mask_iou_pred.size(0)),
pos_labels]
mask_iou_targets = self.mask_iou_head.get_target(
sampling_results, gt_masks, pos_mask_pred, mask_targets,
self.train_cfg.rcnn)
loss_mask_iou = self.mask_iou_head.loss(pos_mask_iou_pred,
mask_iou_targets)
losses.update(loss_mask_iou)
return losses
def simple_test_mask(self,
x,
img_meta,
det_bboxes,
det_labels,
rescale=False):
# image shape of the first image in the batch (only one)
ori_shape = img_meta[0]['ori_shape']
scale_factor = img_meta[0]['scale_factor']
if det_bboxes.shape[0] == 0:
segm_result = [[] for _ in range(self.mask_head.num_classes - 1)]
mask_scores = [[] for _ in range(self.mask_head.num_classes - 1)]
else:
# if det_bboxes is rescaled to the original image size, we need to
# rescale it back to the testing scale to obtain RoIs.
_bboxes = (
det_bboxes[:, :4] * scale_factor if rescale else det_bboxes)
mask_rois = bbox2roi([_bboxes])
mask_feats = self.mask_roi_extractor(
x[:len(self.mask_roi_extractor.featmap_strides)], mask_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
mask_pred = self.mask_head(mask_feats)
segm_result = self.mask_head.get_seg_masks(mask_pred, _bboxes,
det_labels,
self.test_cfg.rcnn,
ori_shape, scale_factor,
rescale)
# get mask scores with mask iou head
mask_iou_pred = self.mask_iou_head(
mask_feats, mask_pred[range(det_labels.size(0)),
det_labels + 1])
mask_scores = self.mask_iou_head.get_mask_scores(
mask_iou_pred, det_bboxes, det_labels)
return segm_result, mask_scores
import torch
from mmdet.core import bbox2result, bbox_mapping_back, multiclass_nms
from ..registry import DETECTORS
from .single_stage import SingleStageDetector
@DETECTORS.register_module
class RepPointsDetector(SingleStageDetector):
"""RepPoints: Point Set Representation for Object Detection.
This detector is the implementation of:
- RepPoints detector (https://arxiv.org/pdf/1904.11490)
"""
def __init__(self,
backbone,
neck,
bbox_head,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(RepPointsDetector,
self).__init__(backbone, neck, bbox_head, train_cfg, test_cfg,
pretrained)
def merge_aug_results(self, aug_bboxes, aug_scores, img_metas):
"""Merge augmented detection bboxes and scores.
Args:
aug_bboxes (list[Tensor]): shape (n, 4*#class)
aug_scores (list[Tensor] or None): shape (n, #class)
img_shapes (list[Tensor]): shape (3, ).
Returns:
tuple: (bboxes, scores)
"""
recovered_bboxes = []
for bboxes, img_info in zip(aug_bboxes, img_metas):
img_shape = img_info[0]['img_shape']
scale_factor = img_info[0]['scale_factor']
flip = img_info[0]['flip']
bboxes = bbox_mapping_back(bboxes, img_shape, scale_factor, flip)
recovered_bboxes.append(bboxes)
bboxes = torch.cat(recovered_bboxes, dim=0)
if aug_scores is None:
return bboxes
else:
scores = torch.cat(aug_scores, dim=0)
return bboxes, scores
def aug_test(self, imgs, img_metas, rescale=False):
# recompute feats to save memory
feats = self.extract_feats(imgs)
aug_bboxes = []
aug_scores = []
for x, img_meta in zip(feats, img_metas):
# only one image in the batch
outs = self.bbox_head(x)
bbox_inputs = outs + (img_meta, self.test_cfg, False, False)
det_bboxes, det_scores = self.bbox_head.get_bboxes(*bbox_inputs)[0]
aug_bboxes.append(det_bboxes)
aug_scores.append(det_scores)
# after merging, bboxes will be rescaled to the original image size
merged_bboxes, merged_scores = self.merge_aug_results(
aug_bboxes, aug_scores, img_metas)
det_bboxes, det_labels = multiclass_nms(merged_bboxes, merged_scores,
self.test_cfg.score_thr,
self.test_cfg.nms,
self.test_cfg.max_per_img)
if rescale:
_det_bboxes = det_bboxes
else:
_det_bboxes = det_bboxes.clone()
_det_bboxes[:, :4] *= img_metas[0][0]['scale_factor']
bbox_results = bbox2result(_det_bboxes, det_labels,
self.bbox_head.num_classes)
return bbox_results
from ..registry import DETECTORS
from .single_stage import SingleStageDetector
@DETECTORS.register_module
class RetinaNet(SingleStageDetector):
def __init__(self,
backbone,
neck,
bbox_head,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(RetinaNet, self).__init__(backbone, neck, bbox_head, train_cfg,
test_cfg, pretrained)
import mmcv
from mmdet.core import bbox_mapping, tensor2imgs
from .. import builder
from ..registry import DETECTORS
from .base import BaseDetector
from .test_mixins import RPNTestMixin
@DETECTORS.register_module
class RPN(BaseDetector, RPNTestMixin):
def __init__(self,
backbone,
neck,
rpn_head,
train_cfg,
test_cfg,
pretrained=None):
super(RPN, self).__init__()
self.backbone = builder.build_backbone(backbone)
self.neck = builder.build_neck(neck) if neck is not None else None
self.rpn_head = builder.build_head(rpn_head)
self.train_cfg = train_cfg
self.test_cfg = test_cfg
self.init_weights(pretrained=pretrained)
def init_weights(self, pretrained=None):
super(RPN, self).init_weights(pretrained)
self.backbone.init_weights(pretrained=pretrained)
if self.with_neck:
self.neck.init_weights()
self.rpn_head.init_weights()
def extract_feat(self, img):
x = self.backbone(img)
if self.with_neck:
x = self.neck(x)
return x
def forward_dummy(self, img):
x = self.extract_feat(img)
rpn_outs = self.rpn_head(x)
return rpn_outs
def forward_train(self,
img,
img_meta,
gt_bboxes=None,
gt_bboxes_ignore=None):
if self.train_cfg.rpn.get('debug', False):
self.rpn_head.debug_imgs = tensor2imgs(img)
x = self.extract_feat(img)
rpn_outs = self.rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_meta, self.train_cfg.rpn)
losses = self.rpn_head.loss(
*rpn_loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
return losses
def simple_test(self, img, img_meta, rescale=False):
x = self.extract_feat(img)
proposal_list = self.simple_test_rpn(x, img_meta, self.test_cfg.rpn)
if rescale:
for proposals, meta in zip(proposal_list, img_meta):
proposals[:, :4] /= meta['scale_factor']
# TODO: remove this restriction
return proposal_list[0].cpu().numpy()
def aug_test(self, imgs, img_metas, rescale=False):
proposal_list = self.aug_test_rpn(
self.extract_feats(imgs), img_metas, self.test_cfg.rpn)
if not rescale:
for proposals, img_meta in zip(proposal_list, img_metas[0]):
img_shape = img_meta['img_shape']
scale_factor = img_meta['scale_factor']
flip = img_meta['flip']
proposals[:, :4] = bbox_mapping(proposals[:, :4], img_shape,
scale_factor, flip)
# TODO: remove this restriction
return proposal_list[0].cpu().numpy()
def show_result(self, data, result, dataset=None, top_k=20):
"""Show RPN proposals on the image.
Although we assume batch size is 1, this method supports arbitrary
batch size.
"""
img_tensor = data['img'][0]
img_metas = data['img_meta'][0].data[0]
imgs = tensor2imgs(img_tensor, **img_metas[0]['img_norm_cfg'])
assert len(imgs) == len(img_metas)
for img, img_meta in zip(imgs, img_metas):
h, w, _ = img_meta['img_shape']
img_show = img[:h, :w, :]
mmcv.imshow_bboxes(img_show, result, top_k=top_k)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment