Unverified Commit 7d343fd2 authored by Kai Chen's avatar Kai Chen Committed by GitHub
Browse files

Merge pull request #8 from hellock/dev

API cleaning and code refactoring (WIP)
parents 0e0b9246 630687f4
import collections
import torch
import torch.nn.functional as F
from torch.utils.data.dataloader import default_collate
from .utils import DataContainer
# https://github.com/pytorch/pytorch/issues/973
import resource
rlimit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, rlimit[1]))
__all__ = ['collate']
def collate(batch, samples_per_gpu=1):
if not isinstance(batch, collections.Sequence):
raise TypeError("{} is not supported.".format(batch.dtype))
if isinstance(batch[0], DataContainer):
assert len(batch) % samples_per_gpu == 0
stacked = []
if batch[0].stack:
for i in range(0, len(batch), samples_per_gpu):
assert isinstance(batch[i].data, torch.Tensor)
# TODO: handle tensors other than 3d
assert batch[i].dim() == 3
c, h, w = batch[0].size()
for sample in batch[i:i + samples_per_gpu]:
assert c == sample.size(0)
h = max(h, sample.size(1))
w = max(w, sample.size(2))
padded_samples = [
F.pad(
sample.data,
(0, w - sample.size(2), 0, h - sample.size(1)),
value=sample.padding_value)
for sample in batch[i:i + samples_per_gpu]
]
stacked.append(default_collate(padded_samples))
else:
for i in range(0, len(batch), samples_per_gpu):
stacked.append(
[sample.data for sample in batch[i:i + samples_per_gpu]])
return DataContainer(stacked, batch[0].stack, batch[0].padding_value)
elif isinstance(batch[0], collections.Sequence):
transposed = zip(*batch)
return [collate(samples, samples_per_gpu) for samples in transposed]
elif isinstance(batch[0], collections.Mapping):
return {
key: collate([d[key] for d in batch], samples_per_gpu)
for key in batch[0]
}
else:
return default_collate(batch)
from functools import partial
import torch
from .coco import CocoDataset
from .collate import collate
from .sampler import GroupSampler, DistributedGroupSampler
def build_data(cfg, args):
dataset = CocoDataset(**cfg)
if args.dist:
sampler = DistributedGroupSampler(dataset, args.img_per_gpu,
args.world_size, args.rank)
batch_size = args.img_per_gpu
num_workers = args.data_workers
else:
sampler = GroupSampler(dataset, args.img_per_gpu)
batch_size = args.world_size * args.img_per_gpu
num_workers = args.world_size * args.data_workers
loader = torch.utils.data.DataLoader(
dataset,
batch_size=args.img_per_gpu,
sampler=sampler,
num_workers=num_workers,
collate_fn=partial(collate, samples_per_gpu=args.img_per_gpu),
pin_memory=False)
return loader
from .build_loader import build_dataloader
from .sampler import GroupSampler, DistributedGroupSampler
__all__ = [
'GroupSampler', 'DistributedGroupSampler', 'build_dataloader'
]
from functools import partial
from mmcv.runner import get_dist_info
from mmcv.parallel import collate
from torch.utils.data import DataLoader
from .sampler import GroupSampler, DistributedGroupSampler
# https://github.com/pytorch/pytorch/issues/973
import resource
rlimit = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (4096, rlimit[1]))
def build_dataloader(dataset,
imgs_per_gpu,
workers_per_gpu,
num_gpus,
dist=True,
**kwargs):
if dist:
rank, world_size = get_dist_info()
sampler = DistributedGroupSampler(dataset, imgs_per_gpu, world_size,
rank)
batch_size = imgs_per_gpu
num_workers = workers_per_gpu
else:
sampler = GroupSampler(dataset, imgs_per_gpu)
batch_size = num_gpus * imgs_per_gpu
num_workers = num_gpus * workers_per_gpu
if not kwargs.get('shuffle', True):
sampler = None
data_loader = DataLoader(
dataset,
batch_size=batch_size,
sampler=sampler,
num_workers=num_workers,
collate_fn=partial(collate, samples_per_gpu=imgs_per_gpu),
pin_memory=False,
**kwargs)
return data_loader
......@@ -7,8 +7,6 @@ import numpy as np
from torch.distributed import get_world_size, get_rank
from torch.utils.data.sampler import Sampler
__all__ = ['GroupSampler', 'DistributedGroupSampler']
class GroupSampler(Sampler):
......
......@@ -2,15 +2,12 @@ import mmcv
import numpy as np
import torch
from mmdet.core.mask_ops import segms
__all__ = [
'ImageTransform', 'BboxTransform', 'PolyMaskTransform', 'Numpy2Tensor'
]
__all__ = ['ImageTransform', 'BboxTransform', 'MaskTransform', 'Numpy2Tensor']
class ImageTransform(object):
"""Preprocess an image
"""Preprocess an image.
1. rescale the image to expected size
2. normalize the image
3. flip the image (if needed)
......@@ -29,90 +26,38 @@ class ImageTransform(object):
self.size_divisor = size_divisor
def __call__(self, img, scale, flip=False):
img, scale_factor = mmcv.imrescale(img, scale, True)
img, scale_factor = mmcv.imrescale(img, scale, return_scale=True)
img_shape = img.shape
img = mmcv.imnorm(img, self.mean, self.std, self.to_rgb)
img = mmcv.imnormalize(img, self.mean, self.std, self.to_rgb)
if flip:
img = mmcv.imflip(img)
if self.size_divisor is not None:
img = mmcv.impad_to_multiple(img, self.size_divisor)
pad_shape = img.shape
else:
pad_shape = img_shape
img = img.transpose(2, 0, 1)
return img, img_shape, scale_factor
# img, scale = cvb.resize_keep_ar(img_or_path, max_long_edge,
# max_short_edge, True)
# shape_scale = np.array(img.shape + (scale, ), dtype=np.float32)
# if flip:
# img = img[:, ::-1, :].copy()
# if self.color_order == 'RGB':
# img = cvb.bgr2rgb(img)
# img = img.astype(np.float32)
# img -= self.color_mean
# img /= self.color_std
# if self.size_divisor is None:
# padded_img = img
# else:
# pad_h = int(np.ceil(
# img.shape[0] / self.size_divisor)) * self.size_divisor
# pad_w = int(np.ceil(
# img.shape[1] / self.size_divisor)) * self.size_divisor
# padded_img = cvb.pad_img(img, (pad_h, pad_w), pad_val=0)
# padded_img = padded_img.transpose(2, 0, 1)
# return padded_img, shape_scale
class ImageCrop(object):
"""crop image patches and resize patches into fixed size
1. (read and) flip image (if needed)
2. crop image patches according to given bboxes
3. resize patches into fixed size (default 224x224)
4. normalize the image (if needed)
5. transpose to (c, h, w) (if needed)
"""
return img, img_shape, pad_shape, scale_factor
def __init__(self,
normalize=True,
transpose=True,
color_order='RGB',
color_mean=(0, 0, 0),
color_std=(1, 1, 1)):
self.normalize = normalize
self.transpose = transpose
assert color_order in ['RGB', 'BGR']
self.color_order = color_order
self.color_mean = np.array(color_mean, dtype=np.float32)
self.color_std = np.array(color_std, dtype=np.float32)
def __call__(self,
img_or_path,
bboxes,
crop_size,
scale_ratio=1.0,
flip=False):
img = cvb.read_img(img_or_path)
if flip:
img = img[:, ::-1, :].copy()
crop_imgs = cvb.crop_img(
img,
bboxes[:, :4],
scale_ratio=scale_ratio,
pad_fill=self.color_mean)
processed_crop_imgs_list = []
for i in range(len(crop_imgs)):
crop_img = crop_imgs[i]
crop_img = cvb.resize(crop_img, crop_size)
crop_img = crop_img.astype(np.float32)
crop_img -= self.color_mean
crop_img /= self.color_std
processed_crop_imgs_list.append(crop_img)
processed_crop_imgs = np.stack(processed_crop_imgs_list, axis=0)
processed_crop_imgs = processed_crop_imgs.transpose(0, 3, 1, 2)
return processed_crop_imgs
def bbox_flip(bboxes, img_shape):
"""Flip bboxes horizontally.
Args:
bboxes(ndarray): shape (..., 4*k)
img_shape(tuple): (height, width)
"""
assert bboxes.shape[-1] % 4 == 0
w = img_shape[1]
flipped = bboxes.copy()
flipped[..., 0::4] = w - bboxes[..., 2::4] - 1
flipped[..., 2::4] = w - bboxes[..., 0::4] - 1
return flipped
class BboxTransform(object):
"""Preprocess gt bboxes
"""Preprocess gt bboxes.
1. rescale bboxes according to image size
2. flip bboxes (if needed)
3. pad the first dimension to `max_num_gts`
......@@ -124,7 +69,7 @@ class BboxTransform(object):
def __call__(self, bboxes, img_shape, scale_factor, flip=False):
gt_bboxes = bboxes * scale_factor
if flip:
gt_bboxes = mmcv.bbox_flip(gt_bboxes, img_shape)
gt_bboxes = bbox_flip(gt_bboxes, img_shape)
gt_bboxes[:, 0::2] = np.clip(gt_bboxes[:, 0::2], 0, img_shape[1])
gt_bboxes[:, 1::2] = np.clip(gt_bboxes[:, 1::2], 0, img_shape[0])
if self.max_num_gts is None:
......@@ -136,64 +81,25 @@ class BboxTransform(object):
return padded_bboxes
class PolyMaskTransform(object):
def __init__(self):
pass
def __call__(self, gt_mask_polys, gt_poly_lens, img_h, img_w, flip=False):
"""
Args:
gt_mask_polys(list): a list of masks, each mask is a list of polys,
each poly is a list of numbers
gt_poly_lens(list): a list of int, indicating the size of each poly
"""
if flip:
gt_mask_polys = segms.flip_segms(gt_mask_polys, img_h, img_w)
num_polys_per_mask = np.array(
[len(mask_polys) for mask_polys in gt_mask_polys], dtype=np.int64)
gt_poly_lens = np.array(gt_poly_lens, dtype=np.int64)
gt_mask_polys = [
np.concatenate(mask_polys).astype(np.float32)
for mask_polys in gt_mask_polys
]
gt_mask_polys = np.concatenate(gt_mask_polys)
return gt_mask_polys, gt_poly_lens, num_polys_per_mask
class MaskTransform(object):
"""Preprocess masks
"""Preprocess masks.
1. resize masks to expected size and stack to a single array
2. flip the masks (if needed)
3. pad the masks (if needed)
"""
def __init__(self, max_num_gts, pad_size=None):
self.max_num_gts = max_num_gts
self.pad_size = pad_size
def __call__(self, masks, img_size, flip=False):
max_long_edge = max(img_size)
max_short_edge = min(img_size)
def __call__(self, masks, pad_shape, scale_factor, flip=False):
masks = [
cvb.resize_keep_ar(
mask,
max_long_edge,
max_short_edge,
interpolation=cvb.INTER_NEAREST) for mask in masks
mmcv.imrescale(mask, scale_factor, interpolation='nearest')
for mask in masks
]
masks = np.stack(masks, axis=0)
if flip:
masks = masks[:, ::-1, :]
if self.pad_size is None:
pad_h = masks.shape[1]
pad_w = masks.shape[2]
else:
pad_size = self.pad_size if self.pad_size > 0 else max_long_edge
pad_h = pad_w = pad_size
padded_masks = np.zeros(
(self.max_num_gts, pad_h, pad_w), dtype=masks.dtype)
padded_masks[:masks.shape[0], :masks.shape[1], :masks.shape[2]] = masks
masks = [mask[:, ::-1] for mask in masks]
padded_masks = [
mmcv.impad(mask, pad_shape[:2], pad_val=0) for mask in masks
]
padded_masks = np.stack(padded_masks, axis=0)
return padded_masks
......
from collections import Sequence
import mmcv
import torch
import matplotlib.pyplot as plt
import numpy as np
import pycocotools.mask as maskUtils
def to_tensor(data):
"""Convert objects of various python types to :obj:`torch.Tensor`.
Supported types are: :class:`numpy.ndarray`, :class:`torch.Tensor`,
:class:`Sequence`, :class:`int` and :class:`float`.
"""
if isinstance(data, torch.Tensor):
return data
elif isinstance(data, np.ndarray):
return torch.from_numpy(data)
elif isinstance(data, Sequence) and not mmcv.is_str(data):
return torch.tensor(data)
elif isinstance(data, int):
return torch.LongTensor([data])
elif isinstance(data, float):
return torch.FloatTensor([data])
else:
raise TypeError('type {} cannot be converted to tensor.'.format(
type(data)))
def random_scale(img_scales, mode='range'):
......@@ -44,19 +67,3 @@ def show_ann(coco, img, ann_info):
plt.axis('off')
coco.showAnns(ann_info)
plt.show()
def draw_bbox_and_segm(img, results, dataset, score_thr=0.5):
bbox_results, segm_results = results
hi_bboxes = []
for cls_bboxes, cls_segms in zip(bbox_results, segm_results):
if len(cls_bboxes) == 0:
hi_bboxes.append(cls_bboxes)
continue
inds = np.where(cls_bboxes[:, -1] > score_thr)[0]
hi_bboxes.append(cls_bboxes[inds, :])
color_mask = np.random.random((1, 3))
for i in inds:
mask = maskUtils.decode(cls_segms[i]).astype(np.bool)
img[mask] = img[mask] * 0.5 + color_mask * 0.5
mmcv.draw_bboxes_with_label(np.ascontiguousarray(img), hi_bboxes, dataset)
from .data_container import DataContainer
from .misc import *
import functools
from collections import Sequence
import mmcv
import numpy as np
import torch
def to_tensor(data):
"""Convert objects of various python types to :obj:`torch.Tensor`.
Supported types are: :class:`numpy.ndarray`, :class:`torch.Tensor`,
:class:`Sequence`, :class:`int` and :class:`float`.
"""
if isinstance(data, np.ndarray):
return torch.from_numpy(data)
elif isinstance(data, torch.Tensor):
return data
elif isinstance(data, Sequence) and not mmcv.is_str(data):
return torch.tensor(data)
elif isinstance(data, int):
return torch.LongTensor([data])
elif isinstance(data, float):
return torch.FloatTensor([data])
else:
raise TypeError('type {} cannot be converted to tensor.'.format(
type(data)))
def assert_tensor_type(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not isinstance(args[0].data, torch.Tensor):
raise AttributeError('{} has no attribute {} for type {}'.format(
args[0].__class__.__name__, func.__name__, args[0].datatype))
return func(*args, **kwargs)
return wrapper
class DataContainer(object):
def __init__(self, data, stack=False, padding_value=0):
if isinstance(data, list):
self._data = data
else:
self._data = to_tensor(data)
self._stack = stack
self._padding_value = padding_value
def __repr__(self):
return '{}({})'.format(self.__class__.__name__, repr(self.data))
@property
def data(self):
return self._data
@property
def datatype(self):
if isinstance(self.data, torch.Tensor):
return self.data.type()
else:
return type(self.data)
@property
def stack(self):
return self._stack
@property
def padding_value(self):
return self._padding_value
@assert_tensor_type
def size(self, *args, **kwargs):
return self.data.size(*args, **kwargs)
@assert_tensor_type
def dim(self):
return self.data.dim()
from .detectors import Detector
from .detectors import BaseDetector, RPN, FasterRCNN, MaskRCNN
from .builder import (build_neck, build_rpn_head, build_roi_extractor,
build_bbox_head, build_mask_head, build_detector)
__all__ = [
'BaseDetector', 'RPN', 'FasterRCNN', 'MaskRCNN', 'build_backbone',
'build_neck', 'build_rpn_head', 'build_roi_extractor', 'build_bbox_head',
'build_mask_head', 'build_detector'
]
from .resnet import resnet
__all__ = ['resnet']
import logging
import math
import torch.nn as nn
import torch.utils.checkpoint as cp
from torchpack import load_checkpoint
from mmcv.runner import load_checkpoint
def conv3x3(in_planes, out_planes, stride=1, dilation=1):
......@@ -25,7 +27,7 @@ class BasicBlock(nn.Module):
stride=1,
dilation=1,
downsample=None,
style='fb'):
style='pytorch'):
super(BasicBlock, self).__init__()
self.conv1 = conv3x3(inplanes, planes, stride, dilation)
self.bn1 = nn.BatchNorm2d(planes)
......@@ -64,15 +66,16 @@ class Bottleneck(nn.Module):
stride=1,
dilation=1,
downsample=None,
style='fb',
style='pytorch',
with_cp=False):
"""Bottleneck block
if style is "fb", the stride-two layer is the 3x3 conv layer,
if style is "msra", the stride-two layer is the first 1x1 conv layer
"""Bottleneck block.
If style is "pytorch", the stride-two layer is the 3x3 conv layer,
if it is "caffe", the stride-two layer is the first 1x1 conv layer.
"""
super(Bottleneck, self).__init__()
assert style in ['fb', 'msra']
if style == 'fb':
assert style in ['pytorch', 'caffe']
if style == 'pytorch':
conv1_stride = 1
conv2_stride = stride
else:
......@@ -139,7 +142,7 @@ def make_res_layer(block,
blocks,
stride=1,
dilation=1,
style='fb',
style='pytorch',
with_cp=False):
downsample = None
if stride != 1 or inplanes != planes * block.expansion:
......@@ -173,7 +176,12 @@ def make_res_layer(block,
class ResHead(nn.Module):
def __init__(self, block, num_blocks, stride=2, dilation=1, style='fb'):
def __init__(self,
block,
num_blocks,
stride=2,
dilation=1,
style='pytorch'):
self.layer4 = make_res_layer(
block,
1024,
......@@ -196,9 +204,10 @@ class ResNet(nn.Module):
dilations=(1, 1, 1, 1),
out_indices=(0, 1, 2, 3),
frozen_stages=-1,
style='fb',
style='pytorch',
sync_bn=False,
with_cp=False):
with_cp=False,
strict_frozen=False):
super(ResNet, self).__init__()
if not len(layers) == len(strides) == len(dilations):
raise ValueError(
......@@ -234,14 +243,17 @@ class ResNet(nn.Module):
style=self.style,
with_cp=with_cp)
self.inplanes = planes * block.expansion
setattr(self, layer_name, res_layer)
self.add_module(layer_name, res_layer)
self.res_layers.append(layer_name)
self.feat_dim = block.expansion * 64 * 2**(len(layers) - 1)
self.with_cp = with_cp
self.strict_frozen = strict_frozen
def init_weights(self, pretrained=None):
if isinstance(pretrained, str):
load_checkpoint(self, pretrained, strict=False)
logger = logging.getLogger()
load_checkpoint(self, pretrained, strict=False, logger=logger)
elif pretrained is None:
for m in self.modules():
if isinstance(m, nn.Conv2d):
......@@ -275,6 +287,9 @@ class ResNet(nn.Module):
for m in self.modules():
if isinstance(m, nn.BatchNorm2d):
m.eval()
if self.strict_frozen:
for params in m.parameters():
params.requires_grad = False
if mode and self.frozen_stages >= 0:
for param in self.conv1.parameters():
param.requires_grad = False
......@@ -305,9 +320,10 @@ def resnet(depth,
dilations=(1, 1, 1, 1),
out_indices=(2, ),
frozen_stages=-1,
style='fb',
style='pytorch',
sync_bn=False,
with_cp=False):
with_cp=False,
strict_frozen=False):
"""Constructs a ResNet model.
Args:
......@@ -321,5 +337,5 @@ def resnet(depth,
raise KeyError('invalid depth {} for resnet'.format(depth))
block, layers = resnet_cfg[depth]
model = ResNet(block, layers[:num_stages], strides, dilations, out_indices,
frozen_stages, style, sync_bn, with_cp)
frozen_stages, style, sync_bn, with_cp, strict_frozen)
return model
import torch.nn as nn
import torch.nn.functional as F
from mmdet.core import (bbox_transform_inv, multiclass_nms, bbox_target,
from mmdet.core import (delta2bbox, multiclass_nms, bbox_target,
weighted_cross_entropy, weighted_smoothl1, accuracy)
......@@ -60,7 +60,7 @@ class BBoxHead(nn.Module):
return cls_score, bbox_pred
def get_bbox_target(self, pos_proposals, neg_proposals, pos_gt_bboxes,
pos_gt_labels, rcnn_train_cfg):
pos_gt_labels, rcnn_train_cfg):
reg_num_classes = 1 if self.reg_class_agnostic else self.num_classes
cls_reg_targets = bbox_target(
pos_proposals,
......@@ -85,7 +85,7 @@ class BBoxHead(nn.Module):
bbox_pred,
bbox_targets,
bbox_weights,
ave_factor=bbox_targets.size(0))
avg_factor=bbox_targets.size(0))
return losses
def get_det_bboxes(self,
......@@ -101,15 +101,14 @@ class BBoxHead(nn.Module):
scores = F.softmax(cls_score, dim=1) if cls_score is not None else None
if bbox_pred is not None:
bboxes = bbox_transform_inv(rois[:, 1:], bbox_pred,
self.target_means, self.target_stds,
img_shape)
bboxes = delta2bbox(rois[:, 1:], bbox_pred, self.target_means,
self.target_stds, img_shape)
else:
bboxes = rois[:, 1:]
# TODO: add clip here
if rescale:
bboxes /= scale_factor.float()
bboxes /= scale_factor
if nms_cfg is None:
return bboxes, scores
......
......@@ -43,17 +43,21 @@ class ConvFCRoIHead(BBoxHead):
self.fc_out_channels = fc_out_channels
# add shared convs and fcs
self.shared_convs, self.shared_fcs, last_layer_dim = self._add_conv_fc_branch(
self.num_shared_convs, self.num_shared_fcs, self.in_channels, True)
self.shared_convs, self.shared_fcs, last_layer_dim = \
self._add_conv_fc_branch(
self.num_shared_convs, self.num_shared_fcs, self.in_channels,
True)
self.shared_out_channels = last_layer_dim
# add cls specific branch
self.cls_convs, self.cls_fcs, self.cls_last_dim = self._add_conv_fc_branch(
self.num_cls_convs, self.num_cls_fcs, self.shared_out_channels)
self.cls_convs, self.cls_fcs, self.cls_last_dim = \
self._add_conv_fc_branch(
self.num_cls_convs, self.num_cls_fcs, self.shared_out_channels)
# add reg specific branch
self.reg_convs, self.reg_fcs, self.reg_last_dim = self._add_conv_fc_branch(
self.num_reg_convs, self.num_reg_fcs, self.shared_out_channels)
self.reg_convs, self.reg_fcs, self.reg_last_dim = \
self._add_conv_fc_branch(
self.num_reg_convs, self.num_reg_fcs, self.shared_out_channels)
if self.num_shared_fcs == 0 and not self.with_avg_pool:
if self.num_cls_fcs == 0:
......
import mmcv
from mmcv import torchpack
from mmcv.runner import obj_from_dict
from torch import nn
from . import (backbones, necks, roi_extractors, rpn_heads, bbox_heads,
mask_heads)
mask_heads, detectors)
__all__ = [
'build_backbone', 'build_neck', 'build_rpn_head', 'build_roi_extractor',
'build_bbox_head', 'build_mask_head'
'build_bbox_head', 'build_mask_head', 'build_detector'
]
def _build_module(cfg, parrent=None):
return cfg if isinstance(cfg, nn.Module) else torchpack.obj_from_dict(
cfg, parrent)
def _build_module(cfg, parrent=None, default_args=None):
return cfg if isinstance(cfg, nn.Module) else obj_from_dict(
cfg, parrent, default_args)
def build(cfg, parrent=None):
def build(cfg, parrent=None, default_args=None):
if isinstance(cfg, list):
modules = [_build_module(cfg_, parrent) for cfg_ in cfg]
modules = [_build_module(cfg_, parrent, default_args) for cfg_ in cfg]
return nn.Sequential(*modules)
else:
return _build_module(cfg, parrent)
return _build_module(cfg, parrent, default_args)
def build_backbone(cfg):
......@@ -46,3 +45,7 @@ def build_bbox_head(cfg):
def build_mask_head(cfg):
return build(cfg, mask_heads)
def build_detector(cfg, train_cfg=None, test_cfg=None):
return build(cfg, detectors, dict(train_cfg=train_cfg, test_cfg=test_cfg))
from .detector import Detector
from .base import BaseDetector
from .rpn import RPN
from .faster_rcnn import FasterRCNN
from .mask_rcnn import MaskRCNN
__all__ = ['BaseDetector', 'RPN', 'FasterRCNN', 'MaskRCNN']
import logging
from abc import ABCMeta, abstractmethod
import mmcv
import numpy as np
import torch
import torch.nn as nn
from mmdet.core import tensor2imgs, get_classes
class BaseDetector(nn.Module):
"""Base class for detectors"""
__metaclass__ = ABCMeta
def __init__(self):
super(BaseDetector, self).__init__()
@property
def with_neck(self):
return hasattr(self, 'neck') and self.neck is not None
@property
def with_bbox(self):
return hasattr(self, 'bbox_head') and self.bbox_head is not None
@property
def with_mask(self):
return hasattr(self, 'mask_head') and self.mask_head is not None
@abstractmethod
def extract_feat(self, imgs):
pass
def extract_feats(self, imgs):
if isinstance(imgs, torch.Tensor):
return self.extract_feat(imgs)
elif isinstance(imgs, list):
for img in imgs:
yield self.extract_feat(img)
@abstractmethod
def forward_train(self, imgs, img_metas, **kwargs):
pass
@abstractmethod
def simple_test(self, img, img_meta, **kwargs):
pass
@abstractmethod
def aug_test(self, imgs, img_metas, **kwargs):
pass
def init_weights(self, pretrained=None):
if pretrained is not None:
logger = logging.getLogger()
logger.info('load model from: {}'.format(pretrained))
def forward_test(self, imgs, img_metas, **kwargs):
for var, name in [(imgs, 'imgs'), (img_metas, 'img_metas')]:
if not isinstance(var, list):
raise TypeError('{} must be a list, but got {}'.format(
name, type(var)))
num_augs = len(imgs)
if num_augs != len(img_metas):
raise ValueError(
'num of augmentations ({}) != num of image meta ({})'.format(
len(imgs), len(img_metas)))
# TODO: remove the restriction of imgs_per_gpu == 1 when prepared
imgs_per_gpu = imgs[0].size(0)
assert imgs_per_gpu == 1
if num_augs == 1:
return self.simple_test(imgs[0], img_metas[0], **kwargs)
else:
return self.aug_test(imgs, img_metas, **kwargs)
def forward(self, img, img_meta, return_loss=True, **kwargs):
if return_loss:
return self.forward_train(img, img_meta, **kwargs)
else:
return self.forward_test(img, img_meta, **kwargs)
def show_result(self,
data,
result,
img_norm_cfg,
dataset='coco',
score_thr=0.3):
img_tensor = data['img'][0]
img_metas = data['img_meta'][0].data[0]
imgs = tensor2imgs(img_tensor, **img_norm_cfg)
assert len(imgs) == len(img_metas)
if isinstance(dataset, str):
class_names = get_classes(dataset)
elif isinstance(dataset, list):
class_names = dataset
else:
raise TypeError('dataset must be a valid dataset name or a list'
' of class names, not {}'.format(type(dataset)))
for img, img_meta in zip(imgs, img_metas):
h, w, _ = img_meta['img_shape']
img_show = img[:h, :w, :]
labels = [
np.full(bbox.shape[0], i, dtype=np.int32)
for i, bbox in enumerate(result)
]
labels = np.concatenate(labels)
bboxes = np.vstack(result)
mmcv.imshow_det_bboxes(
img_show,
bboxes,
labels,
class_names=class_names,
score_thr=score_thr)
import torch
import torch.nn as nn
from .. import builder
from mmdet.core import (bbox2roi, bbox_mapping, split_combined_gt_polys,
bbox2result, multiclass_nms, merge_aug_proposals,
merge_aug_bboxes, merge_aug_masks, sample_proposals)
class Detector(nn.Module):
def __init__(self,
backbone,
neck=None,
rpn_head=None,
roi_block=None,
bbox_head=None,
mask_block=None,
mask_head=None,
rpn_train_cfg=None,
rpn_test_cfg=None,
rcnn_train_cfg=None,
rcnn_test_cfg=None,
pretrained=None):
super(Detector, self).__init__()
self.backbone = builder.build_backbone(backbone)
self.with_neck = True if neck is not None else False
if self.with_neck:
self.neck = builder.build_neck(neck)
self.with_rpn = True if rpn_head is not None else False
if self.with_rpn:
self.rpn_head = builder.build_rpn_head(rpn_head)
self.rpn_train_cfg = rpn_train_cfg
self.rpn_test_cfg = rpn_test_cfg
self.with_bbox = True if bbox_head is not None else False
if self.with_bbox:
self.bbox_roi_extractor = builder.build_roi_extractor(roi_block)
self.bbox_head = builder.build_bbox_head(bbox_head)
self.rcnn_train_cfg = rcnn_train_cfg
self.rcnn_test_cfg = rcnn_test_cfg
self.with_mask = True if mask_head is not None else False
if self.with_mask:
self.mask_roi_extractor = builder.build_roi_extractor(mask_block)
self.mask_head = builder.build_mask_head(mask_head)
self.init_weights(pretrained=pretrained)
def init_weights(self, pretrained=None):
if pretrained is not None:
print('load model from: {}'.format(pretrained))
self.backbone.init_weights(pretrained=pretrained)
if self.with_neck:
if isinstance(self.neck, nn.Sequential):
for m in self.neck:
m.init_weights()
else:
self.neck.init_weights()
if self.with_rpn:
self.rpn_head.init_weights()
if self.with_bbox:
self.bbox_roi_extractor.init_weights()
self.bbox_head.init_weights()
if self.with_mask:
self.mask_roi_extractor.init_weights()
self.mask_head.init_weights()
def forward(self,
img,
img_meta,
gt_bboxes=None,
proposals=None,
gt_labels=None,
gt_bboxes_ignore=None,
gt_mask_polys=None,
gt_poly_lens=None,
num_polys_per_mask=None,
return_loss=True,
return_bboxes=True,
rescale=False):
assert proposals is not None or self.with_rpn, "Only one of proposals file and RPN can exist."
if not return_loss:
return self.test(img, img_meta, proposals, rescale)
else:
losses = dict()
img_shapes = img_meta['img_shape']
x = self.backbone(img)
if self.with_neck:
x = self.neck(x)
if self.with_rpn:
rpn_outs = self.rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_shapes,
self.rpn_train_cfg)
rpn_losses = self.rpn_head.loss(*rpn_loss_inputs)
losses.update(rpn_losses)
if self.with_bbox:
if self.with_rpn:
proposal_inputs = rpn_outs + (img_shapes, self.rpn_test_cfg)
proposal_list = self.rpn_head.get_proposals(*proposal_inputs)
else:
proposal_list = proposals
(pos_inds, neg_inds, pos_proposals, neg_proposals,
pos_assigned_gt_inds,
pos_gt_bboxes, pos_gt_labels) = sample_proposals(
proposal_list, gt_bboxes, gt_bboxes_ignore, gt_labels,
self.rcnn_train_cfg)
labels, label_weights, bbox_targets, bbox_weights = \
self.bbox_head.get_bbox_target(
pos_proposals, neg_proposals, pos_gt_bboxes, pos_gt_labels,
self.rcnn_train_cfg)
rois = bbox2roi([
torch.cat([pos, neg], dim=0)
for pos, neg in zip(pos_proposals, neg_proposals)
])
# TODO: a more flexible way to configurate feat maps
roi_feats = self.bbox_roi_extractor(
x[:self.bbox_roi_extractor.num_inputs], rois)
cls_score, bbox_pred = self.bbox_head(roi_feats)
loss_bbox = self.bbox_head.loss(cls_score, bbox_pred, labels,
label_weights, bbox_targets,
bbox_weights)
losses.update(loss_bbox)
if self.with_mask:
gt_polys = split_combined_gt_polys(gt_mask_polys, gt_poly_lens,
num_polys_per_mask)
mask_targets = self.mask_head.get_mask_target(
pos_proposals, pos_assigned_gt_inds, gt_polys, img_meta,
self.rcnn_train_cfg)
pos_rois = bbox2roi(pos_proposals)
mask_feats = self.mask_roi_extractor(
x[:self.mask_roi_extractor.num_inputs], pos_rois)
mask_pred = self.mask_head(mask_feats)
losses['loss_mask'] = self.mask_head.loss(mask_pred, mask_targets,
torch.cat(pos_gt_labels))
return losses
def test(self, imgs, img_metas, proposals=None, rescale=False):
"""Test w/ or w/o augmentations."""
assert isinstance(imgs, list) and isinstance(img_metas, list)
assert len(imgs) == len(img_metas)
img_per_gpu = imgs[0].size(0)
assert img_per_gpu == 1
if len(imgs) == 1:
return self.simple_test(imgs[0], img_metas[0], proposals, rescale)
else:
return self.aug_test(imgs, img_metas, proposals, rescale)
def simple_test_rpn(self, x, img_meta):
img_shapes = img_meta['img_shape']
scale_factor = img_meta['scale_factor']
rpn_outs = self.rpn_head(x)
proposal_inputs = rpn_outs + (img_shapes, self.rpn_test_cfg)
proposal_list = self.rpn_head.get_proposals(*proposal_inputs)[0]
return proposal_list
def simple_test_bboxes(self, x, img_meta, proposals, rescale=False):
"""Test only det bboxes without augmentation."""
rois = bbox2roi(proposals)
roi_feats = self.bbox_roi_extractor(
x[:len(self.bbox_roi_extractor.featmap_strides)], rois)
cls_score, bbox_pred = self.bbox_head(roi_feats)
# image shape of the first image in the batch (only one)
img_shape = img_meta['img_shape'][0]
scale_factor = img_meta['scale_factor']
det_bboxes, det_labels = self.bbox_head.get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=rescale,
nms_cfg=self.rcnn_test_cfg)
return det_bboxes, det_labels
def simple_test_mask(self,
x,
img_meta,
det_bboxes,
det_labels,
rescale=False):
# image shape of the first image in the batch (only one)
img_shape = img_meta['img_shape'][0]
scale_factor = img_meta['scale_factor']
if det_bboxes.shape[0] == 0:
segm_result = [[] for _ in range(self.mask_head.num_classes - 1)]
else:
# if det_bboxes is rescaled to the original image size, we need to
# rescale it back to the testing scale to obtain RoIs.
_bboxes = (det_bboxes[:, :4] * scale_factor.float()
if rescale else det_bboxes)
mask_rois = bbox2roi([_bboxes])
mask_feats = self.mask_roi_extractor(
x[:len(self.mask_roi_extractor.featmap_strides)], mask_rois)
mask_pred = self.mask_head(mask_feats)
segm_result = self.mask_head.get_seg_masks(
mask_pred,
det_bboxes,
det_labels,
self.rcnn_test_cfg,
ori_scale=img_meta['ori_shape'])
return segm_result
def simple_test(self, img, img_meta, proposals=None, rescale=False):
"""Test without augmentation."""
# get feature maps
x = self.backbone(img)
if self.with_neck:
x = self.neck(x)
if self.with_rpn:
proposals = self.simple_test_rpn(x, img_meta)
if self.with_bbox:
# BUG proposals shape?
det_bboxes, det_labels = self.simple_test_bboxes(
x, img_meta, [proposals], rescale=rescale)
bbox_result = bbox2result(det_bboxes, det_labels,
self.bbox_head.num_classes)
if not self.with_mask:
return bbox_result
segm_result = self.simple_test_mask(
x, img_meta, det_bboxes, det_labels, rescale=rescale)
return bbox_result, segm_result
else:
proposals[:, :4] /= img_meta['scale_factor'].float()
return proposals.cpu().numpy()
# TODO aug test haven't been verified
def aug_test_bboxes(self, imgs, img_metas):
"""Test with augmentations for det bboxes."""
# step 1: get RPN proposals for augmented images, apply NMS to the
# union of all proposals.
aug_proposals = []
for img, img_meta in zip(imgs, img_metas):
x = self.backbone(img)
if self.neck is not None:
x = self.neck(x)
rpn_outs = self.rpn_head(x)
proposal_inputs = rpn_outs + (img_meta['shape_scale'],
self.rpn_test_cfg)
proposal_list = self.rpn_head.get_proposals(*proposal_inputs)
assert len(proposal_list) == 1
aug_proposals.append(proposal_list[0]) # len(proposal_list) = 1
# after merging, proposals will be rescaled to the original image size
merged_proposals = merge_aug_proposals(aug_proposals, img_metas,
self.rpn_test_cfg)
# step 2: Given merged proposals, predict bboxes for augmented images,
# output the union of these bboxes.
aug_bboxes = []
aug_scores = []
for img, img_meta in zip(imgs, img_metas):
# only one image in the batch
img_shape = img_meta['shape_scale'][0]
flip = img_meta['flip'][0]
proposals = bbox_mapping(merged_proposals[:, :4], img_shape, flip)
rois = bbox2roi([proposals])
# recompute feature maps to save GPU memory
x = self.backbone(img)
if self.neck is not None:
x = self.neck(x)
roi_feats = self.bbox_roi_extractor(
x[:len(self.bbox_roi_extractor.featmap_strides)], rois)
cls_score, bbox_pred = self.bbox_head(roi_feats)
bboxes, scores = self.bbox_head.get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
rescale=False,
nms_cfg=None)
aug_bboxes.append(bboxes)
aug_scores.append(scores)
# after merging, bboxes will be rescaled to the original image size
merged_bboxes, merged_scores = merge_aug_bboxes(
aug_bboxes, aug_scores, img_metas, self.rcnn_test_cfg)
det_bboxes, det_labels = multiclass_nms(
merged_bboxes, merged_scores, self.rcnn_test_cfg.score_thr,
self.rcnn_test_cfg.nms_thr, self.rcnn_test_cfg.max_per_img)
return det_bboxes, det_labels
def aug_test_mask(self,
imgs,
img_metas,
det_bboxes,
det_labels,
rescale=False):
# step 3: Given merged bboxes, predict masks for augmented images,
# scores of masks are averaged across augmented images.
if rescale:
_det_bboxes = det_bboxes
else:
_det_bboxes = det_bboxes.clone()
_det_bboxes[:, :4] *= img_metas[0]['shape_scale'][0][-1]
if det_bboxes.shape[0] == 0:
segm_result = [[] for _ in range(self.mask_head.num_classes - 1)]
else:
aug_masks = []
for img, img_meta in zip(imgs, img_metas):
img_shape = img_meta['shape_scale'][0]
flip = img_meta['flip'][0]
_bboxes = bbox_mapping(det_bboxes[:, :4], img_shape, flip)
mask_rois = bbox2roi([_bboxes])
x = self.backbone(img)
if self.neck is not None:
x = self.neck(x)
mask_feats = self.mask_roi_extractor(
x[:len(self.mask_roi_extractor.featmap_strides)],
mask_rois)
mask_pred = self.mask_head(mask_feats)
# convert to numpy array to save memory
aug_masks.append(mask_pred.sigmoid().cpu().numpy())
merged_masks = merge_aug_masks(aug_masks, img_metas,
self.rcnn_test_cfg)
segm_result = self.mask_head.get_seg_masks(
merged_masks, _det_bboxes, det_labels,
img_metas[0]['shape_scale'][0], self.rcnn_test_cfg, rescale)
return segm_result
def aug_test(self, imgs, img_metas, rescale=False):
"""Test with augmentations.
If rescale is False, then returned bboxes and masks will fit the scale
if imgs[0].
"""
# aug test det bboxes
det_bboxes, det_labels = self.aug_test_bboxes(imgs, img_metas)
if rescale:
_det_bboxes = det_bboxes
else:
_det_bboxes = det_bboxes.clone()
_det_bboxes[:, :4] *= img_metas[0]['shape_scale'][0][-1]
bbox_result = bbox2result(_det_bboxes, det_labels,
self.bbox_head.num_classes)
if not self.with_mask:
return bbox_result
segm_result = self.aug_test_mask(
imgs, img_metas, det_bboxes, det_labels, rescale=rescale)
return bbox_result, segm_result
from .two_stage import TwoStageDetector
class FasterRCNN(TwoStageDetector):
def __init__(self,
backbone,
neck,
rpn_head,
bbox_roi_extractor,
bbox_head,
train_cfg,
test_cfg,
pretrained=None):
super(FasterRCNN, self).__init__(
backbone=backbone,
neck=neck,
rpn_head=rpn_head,
bbox_roi_extractor=bbox_roi_extractor,
bbox_head=bbox_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained)
from .two_stage import TwoStageDetector
class MaskRCNN(TwoStageDetector):
def __init__(self,
backbone,
neck,
rpn_head,
bbox_roi_extractor,
bbox_head,
mask_roi_extractor,
mask_head,
train_cfg,
test_cfg,
pretrained=None):
super(MaskRCNN, self).__init__(
backbone=backbone,
neck=neck,
rpn_head=rpn_head,
bbox_roi_extractor=bbox_roi_extractor,
bbox_head=bbox_head,
mask_roi_extractor=mask_roi_extractor,
mask_head=mask_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
pretrained=pretrained)
def show_result(self, data, result, img_norm_cfg, **kwargs):
# TODO: show segmentation masks
assert isinstance(result, tuple)
assert len(result) == 2 # (bbox_results, segm_results)
super(MaskRCNN, self).show_result(data, result[0], img_norm_cfg,
**kwargs)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment