Commit 108fc9e1 authored by Kai Chen's avatar Kai Chen
Browse files

set up the codebase skeleton (WIP)

parent 6985ef31
import warnings
import torch.nn as nn
from .norm import build_norm_layer
class ConvModule(nn.Module):
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride=1,
padding=0,
dilation=1,
groups=1,
bias=True,
normalize=None,
activation='relu',
inplace=True,
activate_last=True):
super(ConvModule, self).__init__()
self.with_norm = normalize is not None
self.with_activatation = activation is not None
self.with_bias = bias
self.activation = activation
self.activate_last = activate_last
if self.with_norm and self.with_bias:
warnings.warn('ConvModule has norm and bias at the same time')
self.conv = nn.Conv2d(
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation,
groups,
bias=bias)
self.in_channels = self.conv.in_channels
self.out_channels = self.conv.out_channels
self.kernel_size = self.conv.kernel_size
self.stride = self.conv.stride
self.padding = self.conv.padding
self.dilation = self.conv.dilation
self.transposed = self.conv.transposed
self.output_padding = self.conv.output_padding
self.groups = self.conv.groups
if self.with_norm:
# self.norm_type, self.norm_params = parse_norm(normalize)
# assert self.norm_type in [None, 'BN', 'SyncBN', 'GN', 'SN']
# self.Norm2d = norm_cfg[self.norm_type]
if self.activate_last:
self.norm = build_norm_layer(normalize, out_channels)
# self.norm = self.Norm2d(out_channels, **self.norm_params)
else:
self.norm = build_norm_layer(normalize, in_channels)
# self.norm = self.Norm2d(in_channels, **self.norm_params)
if self.with_activatation:
assert activation in ['relu'], 'Only ReLU supported.'
if self.activation == 'relu':
self.activate = nn.ReLU(inplace=inplace)
# Default using msra init
self.init_weights()
def init_weights(self):
nonlinearity = 'relu' if self.activation is None else self.activation
nn.init.kaiming_normal_(
self.conv.weight, mode='fan_out', nonlinearity=nonlinearity)
if self.with_bias:
nn.init.constant_(self.conv.bias, 0)
if self.with_norm:
nn.init.constant_(self.norm.weight, 1)
nn.init.constant_(self.norm.bias, 0)
def forward(self, x, activate=True, norm=True):
if self.activate_last:
x = self.conv(x)
if norm and self.with_norm:
x = self.norm(x)
if activate and self.with_activatation:
x = self.activate(x)
else:
if norm and self.with_norm:
x = self.norm(x)
if activate and self.with_activatation:
x = self.activate(x)
x = self.conv(x)
return x
import torch.nn as nn
norm_cfg = {'BN': nn.BatchNorm2d, 'SyncBN': None, 'GN': None}
def build_norm_layer(cfg, num_features):
assert isinstance(cfg, dict) and 'type' in cfg
cfg_ = cfg.copy()
cfg_.setdefault('eps', 1e-5)
layer_type = cfg_.pop('type')
if layer_type not in norm_cfg:
raise KeyError('Unrecognized norm type {}'.format(layer_type))
elif norm_cfg[layer_type] is None:
raise NotImplementedError
return norm_cfg[layer_type](num_features, **cfg_)
import torch.nn as nn
from mmdet.core import tensor2imgs, merge_aug_proposals, bbox_mapping
from .. import builder
class RPN(nn.Module):
def __init__(self,
backbone,
neck,
rpn_head,
rpn_train_cfg,
rpn_test_cfg,
pretrained=None):
super(RPN, self).__init__()
self.backbone = builder.build_backbone(backbone)
self.neck = builder.build_neck(neck) if neck is not None else None
self.rpn_head = builder.build_rpn_head(rpn_head)
self.rpn_train_cfg = rpn_train_cfg
self.rpn_test_cfg = rpn_test_cfg
self.init_weights(pretrained=pretrained)
def init_weights(self, pretrained=None):
if pretrained is not None:
print('load model from: {}'.format(pretrained))
self.backbone.init_weights(pretrained=pretrained)
if self.neck is not None:
self.neck.init_weights()
self.rpn_head.init_weights()
def forward(self,
img,
img_meta,
gt_bboxes=None,
return_loss=True,
return_bboxes=False,
rescale=False):
if not return_loss:
return self.test(img, img_meta, rescale)
img_shapes = img_meta['shape_scale']
if self.rpn_train_cfg.get('debug', False):
self.rpn_head.debug_imgs = tensor2imgs(img)
x = self.backbone(img)
if self.neck is not None:
x = self.neck(x)
rpn_outs = self.rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_shapes,
self.rpn_train_cfg)
losses = self.rpn_head.loss(*rpn_loss_inputs)
return losses
def test(self, imgs, img_metas, rescale=False):
"""Test w/ or w/o augmentations."""
assert isinstance(imgs, list) and isinstance(img_metas, list)
assert len(imgs) == len(img_metas)
img_per_gpu = imgs[0].size(0)
assert img_per_gpu == 1
if len(imgs) == 1:
return self.simple_test(imgs[0], img_metas[0], rescale)
else:
return self.aug_test(imgs, img_metas, rescale)
def simple_test(self, img, img_meta, rescale=False):
img_shapes = img_meta['shape_scale']
# get feature maps
x = self.backbone(img)
if self.neck is not None:
x = self.neck(x)
rpn_outs = self.rpn_head(x)
proposal_inputs = rpn_outs + (img_shapes, self.rpn_test_cfg)
proposals = self.rpn_head.get_proposals(*proposal_inputs)[0]
if rescale:
proposals[:, :4] /= img_shapes[0][-1]
return proposals.cpu().numpy()
def aug_test(self, imgs, img_metas, rescale=False):
aug_proposals = []
for img, img_meta in zip(imgs, img_metas):
x = self.backbone(img)
if self.neck is not None:
x = self.neck(x)
rpn_outs = self.rpn_head(x)
proposal_inputs = rpn_outs + (img_meta['shape_scale'],
self.rpn_test_cfg)
proposal_list = self.rpn_head.get_proposals(*proposal_inputs)
assert len(proposal_list) == 1
aug_proposals.append(proposal_list[0]) # len(proposal_list) = 1
merged_proposals = merge_aug_proposals(aug_proposals, img_metas,
self.rpn_test_cfg)
if not rescale:
img_shape = img_metas[0]['shape_scale'][0]
flip = img_metas[0]['flip'][0]
merged_proposals[:, :4] = bbox_mapping(merged_proposals[:, :4],
img_shape, flip)
return merged_proposals.cpu().numpy()
import torch
import torch.nn as nn
from .. import builder
from mmdet.core.utils import tensor2imgs
from mmdet.core import (bbox2roi, bbox_mapping, split_combined_gt_polys,
bbox_sampling, multiclass_nms, merge_aug_proposals,
merge_aug_bboxes, merge_aug_masks, bbox2result)
class TwoStageDetector(nn.Module):
def __init__(self,
backbone,
neck,
rpn_head,
roi_block,
bbox_head,
rpn_train_cfg,
rpn_test_cfg,
rcnn_train_cfg,
rcnn_test_cfg,
mask_block=None,
mask_head=None,
pretrained=None):
super(TwoStageDetector, self).__init__()
self.backbone = builder.build_backbone(backbone)
self.neck = builder.build_neck(neck) if neck is not None else None
self.rpn_head = builder.build_rpn_head(rpn_head)
self.bbox_roi_extractor = builder.build_roi_block(roi_block)
self.bbox_head = builder.build_bbox_head(bbox_head)
self.mask_roi_extractor = builder.build_roi_block(mask_block) if (
mask_block is not None) else None
self.mask_head = builder.build_mask_head(mask_head) if (
mask_head is not None) else None
self.with_mask = False if self.mask_head is None else True
self.rpn_train_cfg = rpn_train_cfg
self.rpn_test_cfg = rpn_test_cfg
self.rcnn_train_cfg = rcnn_train_cfg
self.rcnn_test_cfg = rcnn_test_cfg
self.init_weights(pretrained=pretrained)
def init_weights(self, pretrained=None):
if pretrained is not None:
print('load model from: {}'.format(pretrained))
self.backbone.init_weights(pretrained=pretrained)
if self.neck is not None:
if isinstance(self.neck, nn.Sequential):
for m in self.neck:
m.init_weights()
else:
self.neck.init_weights()
self.rpn_head.init_weights()
self.bbox_roi_extractor.init_weights()
self.bbox_head.init_weights()
if self.mask_roi_extractor is not None:
self.mask_roi_extractor.init_weights()
if self.mask_head is not None:
self.mask_head.init_weights()
def forward(self,
img,
img_meta,
gt_bboxes=None,
gt_labels=None,
gt_ignore=None,
gt_polys=None,
gt_poly_lens=None,
num_polys_per_mask=None,
return_loss=True,
return_bboxes=False,
rescale=False):
if not return_loss:
return self.test(img, img_meta, rescale)
if not self.with_mask:
assert (gt_polys is None and gt_poly_lens is None
and num_polys_per_mask is None)
else:
assert (gt_polys is not None and gt_poly_lens is not None
and num_polys_per_mask is not None)
gt_polys = split_combined_gt_polys(gt_polys, gt_poly_lens,
num_polys_per_mask)
if self.rpn_train_cfg.get('debug', False):
self.rpn_head.debug_imgs = tensor2imgs(img)
if self.rcnn_train_cfg.get('debug', False):
self.bbox_head.debug_imgs = tensor2imgs(img)
if self.mask_head is not None:
self.mask_head.debug_imgs = tensor2imgs(img)
img_shapes = img_meta['shape_scale']
x = self.backbone(img)
if self.neck is not None:
x = self.neck(x)
rpn_outs = self.rpn_head(x)
proposal_inputs = rpn_outs + (img_shapes, self.rpn_test_cfg)
proposal_list = self.rpn_head.get_proposals(*proposal_inputs)
(pos_inds, neg_inds, pos_proposals, neg_proposals,
pos_assigned_gt_inds, pos_gt_bboxes, pos_gt_labels) = bbox_sampling(
proposal_list, gt_bboxes, gt_ignore, gt_labels,
self.rcnn_train_cfg)
labels, label_weights, bbox_targets, bbox_weights = \
self.bbox_head.proposal_target(
pos_proposals, neg_proposals, pos_gt_bboxes, pos_gt_labels,
self.rcnn_train_cfg)
rois = bbox2roi([
torch.cat([pos, neg], dim=0)
for pos, neg in zip(pos_proposals, neg_proposals)
])
# TODO: a more flexible way to configurate feat maps
roi_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs], rois)
cls_score, bbox_pred = self.bbox_head(roi_feats)
losses = dict()
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_shapes,
self.rpn_train_cfg)
rpn_losses = self.rpn_head.loss(*rpn_loss_inputs)
losses.update(rpn_losses)
loss_bbox = self.bbox_head.loss(cls_score, bbox_pred, labels,
label_weights, bbox_targets,
bbox_weights)
losses.update(loss_bbox)
if self.with_mask:
mask_targets = self.mask_head.mask_target(
pos_proposals, pos_assigned_gt_inds, gt_polys, img_shapes,
self.rcnn_train_cfg)
pos_rois = bbox2roi(pos_proposals)
mask_feats = self.mask_roi_extractor(
x[:self.mask_roi_extractor.num_inputs], pos_rois)
mask_pred = self.mask_head(mask_feats)
losses['loss_mask'] = self.mask_head.loss(mask_pred, mask_targets,
torch.cat(pos_gt_labels))
return losses
def test(self, imgs, img_metas, rescale=False):
"""Test w/ or w/o augmentations."""
assert isinstance(imgs, list) and isinstance(img_metas, list)
assert len(imgs) == len(img_metas)
img_per_gpu = imgs[0].size(0)
assert img_per_gpu == 1
if len(imgs) == 1:
return self.simple_test(imgs[0], img_metas[0], rescale)
else:
return self.aug_test(imgs, img_metas, rescale)
def simple_test_bboxes(self, x, img_meta, rescale=False):
"""Test only det bboxes without augmentation."""
img_shapes = img_meta['shape_scale']
rpn_outs = self.rpn_head(x)
proposal_inputs = rpn_outs + (img_shapes, self.rpn_test_cfg)
proposal_list = self.rpn_head.get_proposals(*proposal_inputs)
rois = bbox2roi(proposal_list)
roi_feats = self.bbox_roi_extractor(
x[:len(self.bbox_roi_extractor.featmap_strides)], rois)
cls_score, bbox_pred = self.bbox_head(roi_feats)
# image shape of the first image in the batch (only one)
img_shape = img_shapes[0]
det_bboxes, det_labels = self.bbox_head.get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
rescale=rescale,
nms_cfg=self.rcnn_test_cfg)
return det_bboxes, det_labels
def simple_test_mask(self,
x,
img_meta,
det_bboxes,
det_labels,
rescale=False):
# image shape of the first image in the batch (only one)
img_shape = img_meta['shape_scale'][0]
if det_bboxes.shape[0] == 0:
segm_result = [[] for _ in range(self.mask_head.num_classes - 1)]
else:
# if det_bboxes is rescaled to the original image size, we need to
# rescale it back to the testing scale to obtain RoIs.
_bboxes = (det_bboxes[:, :4] * img_shape[-1]
if rescale else det_bboxes)
mask_rois = bbox2roi([_bboxes])
mask_feats = self.mask_roi_extractor(
x[:len(self.mask_roi_extractor.featmap_strides)], mask_rois)
mask_pred = self.mask_head(mask_feats)
segm_result = self.mask_head.get_seg_masks(
mask_pred, det_bboxes, det_labels, img_shape,
self.rcnn_test_cfg, rescale)
return segm_result
def simple_test(self, img, img_meta, rescale=False):
"""Test without augmentation."""
# get feature maps
x = self.backbone(img)
if self.neck is not None:
x = self.neck(x)
det_bboxes, det_labels = self.simple_test_bboxes(
x, img_meta, rescale=rescale)
bbox_result = bbox2result(det_bboxes, det_labels,
self.bbox_head.num_classes)
if not self.with_mask:
return bbox_result
segm_result = self.simple_test_mask(
x, img_meta, det_bboxes, det_labels, rescale=rescale)
return bbox_result, segm_result
def aug_test_bboxes(self, imgs, img_metas):
"""Test with augmentations for det bboxes."""
# step 1: get RPN proposals for augmented images, apply NMS to the
# union of all proposals.
aug_proposals = []
for img, img_meta in zip(imgs, img_metas):
x = self.backbone(img)
if self.neck is not None:
x = self.neck(x)
rpn_outs = self.rpn_head(x)
proposal_inputs = rpn_outs + (img_meta['shape_scale'],
self.rpn_test_cfg)
proposal_list = self.rpn_head.get_proposals(*proposal_inputs)
assert len(proposal_list) == 1
aug_proposals.append(proposal_list[0]) # len(proposal_list) = 1
# after merging, proposals will be rescaled to the original image size
merged_proposals = merge_aug_proposals(aug_proposals, img_metas,
self.rpn_test_cfg)
# step 2: Given merged proposals, predict bboxes for augmented images,
# output the union of these bboxes.
aug_bboxes = []
aug_scores = []
for img, img_meta in zip(imgs, img_metas):
# only one image in the batch
img_shape = img_meta['shape_scale'][0]
flip = img_meta['flip'][0]
proposals = bbox_mapping(merged_proposals[:, :4], img_shape, flip)
rois = bbox2roi([proposals])
# recompute feature maps to save GPU memory
x = self.backbone(img)
if self.neck is not None:
x = self.neck(x)
roi_feats = self.bbox_roi_extractor(
x[:len(self.bbox_roi_extractor.featmap_strides)], rois)
cls_score, bbox_pred = self.bbox_head(roi_feats)
bboxes, scores = self.bbox_head.get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
rescale=False,
nms_cfg=None)
aug_bboxes.append(bboxes)
aug_scores.append(scores)
# after merging, bboxes will be rescaled to the original image size
merged_bboxes, merged_scores = merge_aug_bboxes(
aug_bboxes, aug_scores, img_metas, self.rcnn_test_cfg)
det_bboxes, det_labels = multiclass_nms(
merged_bboxes, merged_scores, self.rcnn_test_cfg.score_thr,
self.rcnn_test_cfg.nms_thr, self.rcnn_test_cfg.max_per_img)
return det_bboxes, det_labels
def aug_test_mask(self,
imgs,
img_metas,
det_bboxes,
det_labels,
rescale=False):
# step 3: Given merged bboxes, predict masks for augmented images,
# scores of masks are averaged across augmented images.
if rescale:
_det_bboxes = det_bboxes
else:
_det_bboxes = det_bboxes.clone()
_det_bboxes[:, :4] *= img_metas[0]['shape_scale'][0][-1]
if det_bboxes.shape[0] == 0:
segm_result = [[] for _ in range(self.mask_head.num_classes - 1)]
else:
aug_masks = []
for img, img_meta in zip(imgs, img_metas):
img_shape = img_meta['shape_scale'][0]
flip = img_meta['flip'][0]
_bboxes = bbox_mapping(det_bboxes[:, :4], img_shape, flip)
mask_rois = bbox2roi([_bboxes])
x = self.backbone(img)
if self.neck is not None:
x = self.neck(x)
mask_feats = self.mask_roi_extractor(
x[:len(self.mask_roi_extractor.featmap_strides)],
mask_rois)
mask_pred = self.mask_head(mask_feats)
# convert to numpy array to save memory
aug_masks.append(mask_pred.sigmoid().cpu().numpy())
merged_masks = merge_aug_masks(aug_masks, img_metas,
self.rcnn_test_cfg)
segm_result = self.mask_head.get_seg_masks(
merged_masks, _det_bboxes, det_labels,
img_metas[0]['shape_scale'][0], self.rcnn_test_cfg, rescale)
return segm_result
def aug_test(self, imgs, img_metas, rescale=False):
"""Test with augmentations.
If rescale is False, then returned bboxes and masks will fit the scale
if imgs[0].
"""
# aug test det bboxes
det_bboxes, det_labels = self.aug_test_bboxes(imgs, img_metas)
if rescale:
_det_bboxes = det_bboxes
else:
_det_bboxes = det_bboxes.clone()
_det_bboxes[:, :4] *= img_metas[0]['shape_scale'][0][-1]
bbox_result = bbox2result(_det_bboxes, det_labels,
self.bbox_head.num_classes)
if not self.with_mask:
return bbox_result
segm_result = self.aug_test_mask(
imgs, img_metas, det_bboxes, det_labels, rescale=rescale)
return bbox_result, segm_result
from .fcn_mask_head import FCNMaskHead
__all__ = ['FCNMaskHead']
import mmcv
import numpy as np
import pycocotools.mask as mask_util
import torch
import torch.nn as nn
import torch.utils.checkpoint as cp
from ..common import ConvModule
from mmdet.core import mask_target, mask_cross_entropy
class FCNMaskHead(nn.Module):
def __init__(self,
num_convs=4,
roi_feat_size=14,
in_channels=256,
conv_kernel_size=3,
conv_out_channels=256,
upsample_method='deconv',
upsample_ratio=2,
num_classes=81,
class_agnostic=False,
with_cp=False,
normalize=None):
super(FCNMaskHead, self).__init__()
if upsample_method not in [None, 'deconv', 'nearest', 'bilinear']:
raise ValueError(
'Invalid upsample method {}, accepted methods '
'are "deconv", "nearest", "bilinear"'.format(upsample_method))
self.num_convs = num_convs
self.roi_feat_size = roi_feat_size # WARN: not used and reserved
self.in_channels = in_channels
self.conv_kernel_size = conv_kernel_size
self.conv_out_channels = conv_out_channels
self.upsample_method = upsample_method
self.upsample_ratio = upsample_ratio
self.num_classes = num_classes
self.class_agnostic = class_agnostic
self.normalize = normalize
self.with_bias = normalize is None
self.with_cp = with_cp
self.convs = nn.ModuleList()
for i in range(self.num_convs):
in_channels = (self.in_channels
if i == 0 else self.conv_out_channels)
padding = (self.conv_kernel_size - 1) // 2
self.convs.append(
ConvModule(
in_channels,
self.conv_out_channels,
3,
padding=padding,
normalize=normalize,
bias=self.with_bias))
if self.upsample_method is None:
self.upsample = None
elif self.upsample_method == 'deconv':
self.upsample = nn.ConvTranspose2d(
self.conv_out_channels,
self.conv_out_channels,
self.upsample_ratio,
stride=self.upsample_ratio)
else:
self.upsample = nn.Upsample(
scale_factor=self.upsample_ratio, mode=self.upsample_method)
out_channels = 1 if self.class_agnostic else self.num_classes
self.conv_logits = nn.Conv2d(self.conv_out_channels, out_channels, 1)
self.relu = nn.ReLU(inplace=True)
self.debug_imgs = None
def init_weights(self):
for m in [self.upsample, self.conv_logits]:
if m is None:
continue
nn.init.kaiming_normal_(
m.weight, mode='fan_out', nonlinearity='relu')
nn.init.constant_(m.bias, 0)
def convs_forward(self, x):
def m_lvl_convs_forward(x):
for conv in self.convs[1:-1]:
x = conv(x)
return x
if self.num_convs > 0:
x = self.convs[0](x)
if self.num_convs > 1:
if self.with_cp and x.requires_grad:
x = cp.checkpoint(m_lvl_convs_forward, x)
else:
x = m_lvl_convs_forward(x)
x = self.convs[-1](x)
return x
def forward(self, x):
x = self.convs_forward(x)
if self.upsample is not None:
x = self.upsample(x)
if self.upsample_method == 'deconv':
x = self.relu(x)
mask_pred = self.conv_logits(x)
return mask_pred
def mask_target(self, pos_proposals, pos_assigned_gt_inds, gt_masks,
img_shapes, rcnn_train_cfg):
mask_targets = mask_target(pos_proposals, pos_assigned_gt_inds,
gt_masks, img_shapes, rcnn_train_cfg)
return mask_targets
def loss(self, mask_pred, mask_targets, labels):
loss_mask = mask_cross_entropy(mask_pred, mask_targets, labels)
return loss_mask
def get_seg_masks(self,
mask_pred,
det_bboxes,
det_labels,
img_shape,
rcnn_test_cfg,
ori_scale,
rescale=True):
"""Get segmentation masks from mask_pred and bboxes
Args:
mask_pred (Tensor or ndarray): shape (n, #class+1, h, w).
For single-scale testing, mask_pred is the direct output of
model, whose type is Tensor, while for multi-scale testing,
it will be converted to numpy array outside of this method.
det_bboxes (Tensor): shape (n, 4/5)
det_labels (Tensor): shape (n, )
img_shape (Tensor): shape (3, )
rcnn_test_cfg (dict): rcnn testing config
rescale (bool): whether rescale masks to original image size
Returns:
list[list]: encoded masks
"""
if isinstance(mask_pred, torch.Tensor):
mask_pred = mask_pred.sigmoid().cpu().numpy()
assert isinstance(mask_pred, np.ndarray)
cls_segms = [[] for _ in range(self.num_classes - 1)]
bboxes = det_bboxes.cpu().numpy()[:, :4]
labels = det_labels.cpu().numpy() + 1
scale_factor = img_shape[-1] if rescale else 1.0
img_h = ori_scale['height'] if rescale else np.round(
ori_scale['height'].item() * img_shape[-1].item()).astype(np.int32)
img_w = ori_scale['width'] if rescale else np.round(
ori_scale['width'].item() * img_shape[-1].item()).astype(np.int32)
for i in range(bboxes.shape[0]):
bbox = (bboxes[i, :] / float(scale_factor)).astype(int)
label = labels[i]
w = bbox[2] - bbox[0] + 1
h = bbox[3] - bbox[1] + 1
w = max(w, 1)
h = max(h, 1)
if not self.class_agnostic:
mask_pred_ = mask_pred[i, label, :, :]
else:
mask_pred_ = mask_pred[i, 0, :, :]
im_mask = np.zeros((img_h, img_w), dtype=np.float32)
im_mask[bbox[1]:bbox[1] + h, bbox[0]:bbox[0] + w] = mmcv.resize(
mask_pred_, (w, h))
# im_mask = cv2.resize(im_mask, (img_w, img_h))
im_mask = np.array(
im_mask > rcnn_test_cfg.mask_thr_binary, dtype=np.uint8)
rle = mask_util.encode(
np.array(im_mask[:, :, np.newaxis], order='F'))[0]
cls_segms[label - 1].append(rle)
return cls_segms
from functools import partial
from six.moves import map, zip
def multi_apply(func, *args, **kwargs):
pfunc = partial(func, **kwargs) if kwargs else func
map_results = map(pfunc, *args)
return tuple(map(list, zip(*map_results)))
from .fpn import FPN
__all__ = ['FPN']
import torch.nn as nn
import torch.nn.functional as F
from ..common import ConvModule
from ..weight_init import xavier_init
class FPN(nn.Module):
def __init__(self,
in_channels,
out_channels,
num_outs,
start_level=0,
end_level=-1,
add_extra_convs=False,
normalize=None,
activation=None):
super(FPN, self).__init__()
assert isinstance(in_channels, list)
self.in_channels = in_channels
self.out_channels = out_channels
self.num_ins = len(in_channels)
self.num_outs = num_outs
self.activation = activation
self.with_bias = normalize is None
if end_level == -1:
self.backbone_end_level = self.num_ins
assert num_outs >= self.num_ins - start_level
else:
# if end_level < inputs, no extra level is allowed
self.backbone_end_level = end_level
assert end_level <= len(in_channels)
assert num_outs == end_level - start_level
self.start_level = start_level
self.end_level = end_level
self.add_extra_convs = add_extra_convs
self.lateral_convs = nn.ModuleList()
self.fpn_convs = nn.ModuleList()
for i in range(self.start_level, self.backbone_end_level):
l_conv = ConvModule(
in_channels[i],
out_channels,
1,
normalize=normalize,
bias=self.with_bias,
activation=self.activation,
inplace=False)
fpn_conv = ConvModule(
out_channels,
out_channels,
3,
padding=1,
normalize=normalize,
bias=self.with_bias,
activation=self.activation,
inplace=False)
self.lateral_convs.append(l_conv)
self.fpn_convs.append(fpn_conv)
# lvl_id = i - self.start_level
# setattr(self, 'lateral_conv{}'.format(lvl_id), l_conv)
# setattr(self, 'fpn_conv{}'.format(lvl_id), fpn_conv)
# add extra conv layers (e.g., RetinaNet)
extra_levels = num_outs - self.backbone_end_level + self.start_level
if add_extra_convs and extra_levels >= 1:
for i in range(extra_levels):
in_channels = (self.in_channels[self.backbone_end_level - 1]
if i == 0 else out_channels)
extra_fpn_conv = ConvModule(
in_channels,
out_channels,
3,
stride=2,
padding=1,
normalize=normalize,
bias=self.with_bias,
activation=self.activation,
inplace=False)
self.fpn_convs.append(extra_fpn_conv)
# default init_weights for conv(msra) and norm in ConvModule
def init_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
xavier_init(m, distribution='uniform')
def forward(self, inputs):
assert len(inputs) == len(self.in_channels)
# build laterals
laterals = [
lateral_conv(inputs[i + self.start_level])
for i, lateral_conv in enumerate(self.lateral_convs)
]
# build top-down path
used_backbone_levels = len(laterals)
for i in range(used_backbone_levels - 1, 0, -1):
laterals[i - 1] += F.upsample(
laterals[i], scale_factor=2, mode='nearest')
# build outputs
# part 1: from original levels
outs = [
self.fpn_convs[i](laterals[i]) for i in range(used_backbone_levels)
]
# part 2: add extra levels
if self.num_outs > len(outs):
# use max pool to get more levels on top of outputs (Faster R-CNN, Mask R-CNN)
if not self.add_extra_convs:
for i in range(self.num_outs - used_backbone_levels):
outs.append(F.max_pool2d(outs[-1], 1, stride=2))
# add conv layers on top of original feature maps (RetinaNet)
else:
orig = inputs[self.backbone_end_level - 1]
outs.append(self.fpn_convs[used_backbone_levels](orig))
for i in range(used_backbone_levels + 1, self.num_outs):
# BUG: we should add relu before each extra conv
outs.append(self.fpn_convs[i](outs[-1]))
return tuple(outs)
from .single_level import SingleLevelRoI
__all__ = ['SingleLevelRoI']
from __future__ import division
import torch
import torch.nn as nn
from mmdet import ops
class SingleLevelRoI(nn.Module):
"""Extract RoI features from a single level feature map. Each RoI is
mapped to a level according to its scale."""
def __init__(self,
roi_layer,
out_channels,
featmap_strides,
finest_scale=56):
super(SingleLevelRoI, self).__init__()
self.roi_layers = self.build_roi_layers(roi_layer, featmap_strides)
self.out_channels = out_channels
self.featmap_strides = featmap_strides
self.finest_scale = finest_scale
@property
def num_inputs(self):
return len(self.featmap_strides)
def init_weights(self):
pass
def build_roi_layers(self, layer_cfg, featmap_strides):
cfg = layer_cfg.copy()
layer_type = cfg.pop('type')
assert hasattr(ops, layer_type)
layer_cls = getattr(ops, layer_type)
roi_layers = nn.ModuleList(
[layer_cls(spatial_scale=1 / s, **cfg) for s in featmap_strides])
return roi_layers
def map_roi_levels(self, rois, num_levels):
"""Map rois to corresponding feature levels (0-based) by scales.
scale < finest_scale: level 0
finest_scale <= scale < finest_scale * 2: level 1
finest_scale * 2 <= scale < finest_scale * 4: level 2
scale >= finest_scale * 4: level 3
"""
scale = torch.sqrt(
(rois[:, 3] - rois[:, 1] + 1) * (rois[:, 4] - rois[:, 2] + 1))
target_lvls = torch.floor(torch.log2(scale / self.finest_scale + 1e-6))
target_lvls = target_lvls.clamp(min=0, max=num_levels - 1).long()
return target_lvls
def forward(self, feats, rois):
"""Extract roi features with the roi layer. If multiple feature levels
are used, then rois are mapped to corresponding levels according to
their scales.
"""
if len(feats) == 1:
return self.roi_layers[0](feats[0], rois)
out_size = self.roi_layers[0].out_size
num_levels = len(feats)
target_lvls = self.map_roi_levels(rois, num_levels)
roi_feats = torch.cuda.FloatTensor(rois.size()[0], self.out_channels,
out_size, out_size).fill_(0)
for i in range(num_levels):
inds = target_lvls == i
if inds.any():
rois_ = rois[inds, :]
roi_feats_t = self.roi_layers[i](feats[i], rois_)
roi_feats[inds] += roi_feats_t
return roi_feats
from .rpn_head import RPNHead
__all__ = ['RPNHead']
from __future__ import division
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from mmdet.core import (AnchorGenerator, anchor_target, bbox_transform_inv,
weighted_cross_entropy, weighted_smoothl1,
weighted_binary_cross_entropy)
from mmdet.ops import nms
from ..misc import multi_apply
from ..weight_init import normal_init
class RPNHead(nn.Module):
def __init__(self,
in_channels,
feat_channels=512,
coarsest_stride=32,
anchor_scales=[8, 16, 32],
anchor_ratios=[0.5, 1.0, 2.0],
anchor_strides=[4, 8, 16, 32, 64],
anchor_base_sizes=None,
target_means=(.0, .0, .0, .0),
target_stds=(1.0, 1.0, 1.0, 1.0),
use_sigmoid_cls=False):
super(RPNHead, self).__init__()
self.in_channels = in_channels
self.feat_channels = feat_channels
self.coarsest_stride = coarsest_stride
self.anchor_scales = anchor_scales
self.anchor_ratios = anchor_ratios
self.anchor_strides = anchor_strides
self.anchor_base_sizes = anchor_strides.copy(
) if anchor_base_sizes is None else anchor_base_sizes
self.target_means = target_means
self.target_stds = target_stds
self.use_sigmoid_cls = use_sigmoid_cls
self.anchor_generators = []
for anchor_base in self.anchor_base_sizes:
self.anchor_generators.append(
AnchorGenerator(anchor_base, anchor_scales, anchor_ratios))
self.rpn_conv = nn.Conv2d(in_channels, feat_channels, 3, padding=1)
self.relu = nn.ReLU(inplace=True)
self.num_anchors = len(self.anchor_ratios) * len(self.anchor_scales)
out_channels = (self.num_anchors
if self.use_sigmoid_cls else self.num_anchors * 2)
self.rpn_cls = nn.Conv2d(feat_channels, out_channels, 1)
self.rpn_reg = nn.Conv2d(feat_channels, self.num_anchors * 4, 1)
self.debug_imgs = None
def init_weights(self):
normal_init(self.rpn_conv, std=0.01)
normal_init(self.rpn_cls, std=0.01)
normal_init(self.rpn_reg, std=0.01)
def forward_single(self, x):
rpn_feat = self.relu(self.rpn_conv(x))
rpn_cls_score = self.rpn_cls(rpn_feat)
rpn_bbox_pred = self.rpn_reg(rpn_feat)
return rpn_cls_score, rpn_bbox_pred
def forward(self, feats):
return multi_apply(self.forward_single, feats)
def get_anchors(self, featmap_sizes, img_shapes):
"""Get anchors given a list of feature map sizes, and get valid flags
at the same time. (Extra padding regions should be marked as invalid)
"""
# calculate actual image shapes
padded_img_shapes = []
for img_shape in img_shapes:
h, w = img_shape[:2]
padded_h = int(
np.ceil(h / self.coarsest_stride) * self.coarsest_stride)
padded_w = int(
np.ceil(w / self.coarsest_stride) * self.coarsest_stride)
padded_img_shapes.append((padded_h, padded_w))
# generate anchors for different feature levels
# len = feature levels
anchor_list = []
# len = imgs per gpu
valid_flag_list = [[] for _ in range(len(img_shapes))]
for i in range(len(featmap_sizes)):
anchor_stride = self.anchor_strides[i]
anchors = self.anchor_generators[i].grid_anchors(
featmap_sizes[i], anchor_stride)
anchor_list.append(anchors)
# for each image in this feature level, get valid flags
featmap_size = featmap_sizes[i]
for img_id, (h, w) in enumerate(padded_img_shapes):
valid_feat_h = min(
int(np.ceil(h / anchor_stride)), featmap_size[0])
valid_feat_w = min(
int(np.ceil(w / anchor_stride)), featmap_size[1])
flags = self.anchor_generators[i].valid_flags(
featmap_size, (valid_feat_h, valid_feat_w))
valid_flag_list[img_id].append(flags)
return anchor_list, valid_flag_list
def loss_single(self, rpn_cls_score, rpn_bbox_pred, labels, label_weights,
bbox_targets, bbox_weights, num_total_samples, cfg):
labels = labels.contiguous().view(-1)
label_weights = label_weights.contiguous().view(-1)
bbox_targets = bbox_targets.contiguous().view(-1, 4)
bbox_weights = bbox_weights.contiguous().view(-1, 4)
if self.use_sigmoid_cls:
rpn_cls_score = rpn_cls_score.permute(0, 2, 3,
1).contiguous().view(-1)
loss_cls = weighted_binary_cross_entropy(
rpn_cls_score,
labels,
label_weights,
ave_factor=num_total_samples)
else:
rpn_cls_score = rpn_cls_score.permute(0, 2, 3,
1).contiguous().view(-1, 2)
loss_cls = weighted_cross_entropy(
rpn_cls_score,
labels,
label_weights,
ave_factor=num_total_samples)
rpn_bbox_pred = rpn_bbox_pred.permute(0, 2, 3, 1).contiguous().view(
-1, 4)
loss_reg = weighted_smoothl1(
rpn_bbox_pred,
bbox_targets,
bbox_weights,
beta=cfg.smoothl1_beta,
ave_factor=num_total_samples)
return loss_cls, loss_reg
def loss(self, rpn_cls_scores, rpn_bbox_preds, gt_bboxes, img_shapes, cfg):
featmap_sizes = [featmap.size()[-2:] for featmap in rpn_cls_scores]
assert len(featmap_sizes) == len(self.anchor_generators)
anchor_list, valid_flag_list = self.get_anchors(
featmap_sizes, img_shapes)
cls_reg_targets = anchor_target(
anchor_list, valid_flag_list, featmap_sizes, gt_bboxes, img_shapes,
self.target_means, self.target_stds, cfg)
if cls_reg_targets is None:
return None
(labels_list, label_weights_list, bbox_targets_list, bbox_weights_list,
num_total_samples) = cls_reg_targets
losses_cls, losses_reg = multi_apply(
self.loss_single,
rpn_cls_scores,
rpn_bbox_preds,
labels_list,
label_weights_list,
bbox_targets_list,
bbox_weights_list,
num_total_samples=num_total_samples,
cfg=cfg)
return dict(loss_rpn_cls=losses_cls, loss_rpn_reg=losses_reg)
def get_proposals(self, rpn_cls_scores, rpn_bbox_preds, img_shapes, cfg):
img_per_gpu = len(img_shapes)
featmap_sizes = [featmap.size()[-2:] for featmap in rpn_cls_scores]
mlvl_anchors = [
self.anchor_generators[idx].grid_anchors(featmap_sizes[idx],
self.anchor_strides[idx])
for idx in range(len(featmap_sizes))
]
proposal_list = []
for img_id in range(img_per_gpu):
rpn_cls_score_list = [
rpn_cls_scores[idx][img_id].detach()
for idx in range(len(rpn_cls_scores))
]
rpn_bbox_pred_list = [
rpn_bbox_preds[idx][img_id].detach()
for idx in range(len(rpn_bbox_preds))
]
assert len(rpn_cls_score_list) == len(rpn_bbox_pred_list)
img_shape = img_shapes[img_id]
proposals = self._get_proposals_single(
rpn_cls_score_list, rpn_bbox_pred_list, mlvl_anchors,
img_shape, cfg)
proposal_list.append(proposals)
return proposal_list
def _get_proposals_single(self, rpn_cls_scores, rpn_bbox_preds,
mlvl_anchors, img_shape, cfg):
mlvl_proposals = []
for idx in range(len(rpn_cls_scores)):
rpn_cls_score = rpn_cls_scores[idx]
rpn_bbox_pred = rpn_bbox_preds[idx]
assert rpn_cls_score.size()[-2:] == rpn_bbox_pred.size()[-2:]
anchors = mlvl_anchors[idx]
if self.use_sigmoid_cls:
rpn_cls_score = rpn_cls_score.permute(1, 2,
0).contiguous().view(-1)
rpn_cls_prob = F.sigmoid(rpn_cls_score)
scores = rpn_cls_prob
else:
rpn_cls_score = rpn_cls_score.permute(1, 2,
0).contiguous().view(
-1, 2)
rpn_cls_prob = F.softmax(rpn_cls_score, dim=1)
scores = rpn_cls_prob[:, 1]
rpn_bbox_pred = rpn_bbox_pred.permute(1, 2, 0).contiguous().view(
-1, 4)
_, order = scores.sort(0, descending=True)
if cfg.nms_pre > 0:
order = order[:cfg.nms_pre]
rpn_bbox_pred = rpn_bbox_pred[order, :]
anchors = anchors[order, :]
scores = scores[order]
proposals = bbox_transform_inv(anchors, rpn_bbox_pred,
self.target_means, self.target_stds,
img_shape)
w = proposals[:, 2] - proposals[:, 0] + 1
h = proposals[:, 3] - proposals[:, 1] + 1
valid_inds = torch.nonzero((w >= cfg.min_bbox_size) &
(h >= cfg.min_bbox_size)).squeeze()
proposals = proposals[valid_inds, :]
scores = scores[valid_inds]
proposals = torch.cat([proposals, scores.unsqueeze(-1)], dim=-1)
nms_keep = nms(proposals, cfg.nms_thr)[:cfg.nms_post]
proposals = proposals[nms_keep, :]
mlvl_proposals.append(proposals)
proposals = torch.cat(mlvl_proposals, 0)
if cfg.nms_across_levels:
nms_keep = nms(proposals, cfg.nms_thr)[:cfg.max_num]
proposals = proposals[nms_keep, :]
else:
scores = proposals[:, 4]
_, order = scores.sort(0, descending=True)
num = min(cfg.max_num, proposals.shape[0])
order = order[:num]
proposals = proposals[order, :]
return proposals
import torch.nn as nn
def xavier_init(module, gain=1, bias=0, distribution='normal'):
assert distribution in ['uniform', 'normal']
if distribution == 'uniform':
nn.init.xavier_uniform_(module.weight, gain=gain)
else:
nn.init.xavier_normal_(module.weight, gain=gain)
if hasattr(module, 'bias'):
nn.init.constant_(module.bias, bias)
def normal_init(module, mean=0, std=1, bias=0):
nn.init.normal_(module.weight, mean, std)
if hasattr(module, 'bias'):
nn.init.constant_(module.bias, bias)
def uniform_init(module, a=0, b=1, bias=0):
nn.init.uniform_(module.weight, a, b)
if hasattr(module, 'bias'):
nn.init.constant_(module.bias, bias)
def kaiming_init(module,
mode='fan_out',
nonlinearity='relu',
bias=0,
distribution='normal'):
assert distribution in ['uniform', 'normal']
if distribution == 'uniform':
nn.init.kaiming_uniform_(
module.weight, mode=mode, nonlinearity=nonlinearity)
else:
nn.init.kaiming_normal_(
module.weight, mode=mode, nonlinearity=nonlinearity)
if hasattr(module, 'bias'):
nn.init.constant_(module.bias, bias)
from .parallel import MMDataParallel, MMDistributedDataParallel
from .data_parallel import MMDataParallel
from .distributed import MMDistributedDataParallel
from .scatter_gather import scatter, scatter_kwargs
__all__ = [
'MMDataParallel', 'MMDistributedDataParallel', 'scatter', 'scatter_kwargs'
]
import torch
from torch.nn.parallel._functions import _get_stream
def scatter(input, devices, streams=None):
"""Scatters tensor across multiple GPUs.
"""
if streams is None:
streams = [None] * len(devices)
if isinstance(input, list):
chunk_size = (len(input) - 1) // len(devices) + 1
outputs = [
scatter(input[i], [devices[i // chunk_size]],
[streams[i // chunk_size]]) for i in range(len(input))
]
return outputs
elif isinstance(input, torch.Tensor):
output = input.contiguous()
# TODO: copy to a pinned buffer first (if copying from CPU)
stream = streams[0] if output.numel() > 0 else None
with torch.cuda.device(devices[0]), torch.cuda.stream(stream):
output = output.cuda(devices[0], non_blocking=True)
return output
else:
raise Exception('Unknown type {}.'.format(type(input)))
def synchronize_stream(output, devices, streams):
if isinstance(output, list):
chunk_size = len(output) // len(devices)
for i in range(len(devices)):
for j in range(chunk_size):
synchronize_stream(output[i * chunk_size + j], [devices[i]],
[streams[i]])
elif isinstance(output, torch.Tensor):
if output.numel() != 0:
with torch.cuda.device(devices[0]):
main_stream = torch.cuda.current_stream()
main_stream.wait_stream(streams[0])
output.record_stream(main_stream)
else:
raise Exception('Unknown type {}.'.format(type(output)))
def get_input_device(input):
if isinstance(input, list):
for item in input:
input_device = get_input_device(item)
if input_device != -1:
return input_device
return -1
elif isinstance(input, torch.Tensor):
return input.get_device() if input.is_cuda else -1
else:
raise Exception('Unknown type {}.'.format(type(input)))
class Scatter(object):
@staticmethod
def forward(target_gpus, input):
input_device = get_input_device(input)
streams = None
if input_device == -1:
# Perform CPU to GPU copies in a background stream
streams = [_get_stream(device) for device in target_gpus]
outputs = scatter(input, target_gpus, streams)
# Synchronize with the copy stream
if streams is not None:
synchronize_stream(outputs, target_gpus, streams)
return tuple(outputs)
from torch.nn.parallel import DataParallel
from .scatter_gather import scatter_kwargs
class MMDataParallel(DataParallel):
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
from torch.nn.parallel import DistributedDataParallel
from .scatter_gather import scatter_kwargs
class MMDistributedDataParallel(DistributedDataParallel):
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment