Commit a4372e17 authored by huchen's avatar huchen
Browse files

Merge branch 'hepj_work' into 'main'

增加conformer代码

See merge request dcutoolkit/deeplearing/dlexamples_new!7
parents 7f99c1c3 142dcf29
from .base_sampler import BaseSampler
from .combined_sampler import CombinedSampler
from .instance_balanced_pos_sampler import InstanceBalancedPosSampler
from .iou_balanced_neg_sampler import IoUBalancedNegSampler
from .ohem_sampler import OHEMSampler
from .pseudo_sampler import PseudoSampler
from .random_sampler import RandomSampler
from .sampling_result import SamplingResult
from .score_hlr_sampler import ScoreHLRSampler
__all__ = [
'BaseSampler', 'PseudoSampler', 'RandomSampler',
'InstanceBalancedPosSampler', 'IoUBalancedNegSampler', 'CombinedSampler',
'OHEMSampler', 'SamplingResult', 'ScoreHLRSampler'
]
from abc import ABCMeta, abstractmethod
import torch
from .sampling_result import SamplingResult
class BaseSampler(metaclass=ABCMeta):
"""Base class of samplers."""
def __init__(self,
num,
pos_fraction,
neg_pos_ub=-1,
add_gt_as_proposals=True,
**kwargs):
self.num = num
self.pos_fraction = pos_fraction
self.neg_pos_ub = neg_pos_ub
self.add_gt_as_proposals = add_gt_as_proposals
self.pos_sampler = self
self.neg_sampler = self
@abstractmethod
def _sample_pos(self, assign_result, num_expected, **kwargs):
"""Sample positive samples."""
pass
@abstractmethod
def _sample_neg(self, assign_result, num_expected, **kwargs):
"""Sample negative samples."""
pass
def sample(self,
assign_result,
bboxes,
gt_bboxes,
gt_labels=None,
**kwargs):
"""Sample positive and negative bboxes.
This is a simple implementation of bbox sampling given candidates,
assigning results and ground truth bboxes.
Args:
assign_result (:obj:`AssignResult`): Bbox assigning results.
bboxes (Tensor): Boxes to be sampled from.
gt_bboxes (Tensor): Ground truth bboxes.
gt_labels (Tensor, optional): Class labels of ground truth bboxes.
Returns:
:obj:`SamplingResult`: Sampling result.
Example:
>>> from mmdet.core.bbox import RandomSampler
>>> from mmdet.core.bbox import AssignResult
>>> from mmdet.core.bbox.demodata import ensure_rng, random_boxes
>>> rng = ensure_rng(None)
>>> assign_result = AssignResult.random(rng=rng)
>>> bboxes = random_boxes(assign_result.num_preds, rng=rng)
>>> gt_bboxes = random_boxes(assign_result.num_gts, rng=rng)
>>> gt_labels = None
>>> self = RandomSampler(num=32, pos_fraction=0.5, neg_pos_ub=-1,
>>> add_gt_as_proposals=False)
>>> self = self.sample(assign_result, bboxes, gt_bboxes, gt_labels)
"""
if len(bboxes.shape) < 2:
bboxes = bboxes[None, :]
bboxes = bboxes[:, :4]
gt_flags = bboxes.new_zeros((bboxes.shape[0], ), dtype=torch.uint8)
if self.add_gt_as_proposals and len(gt_bboxes) > 0:
if gt_labels is None:
raise ValueError(
'gt_labels must be given when add_gt_as_proposals is True')
bboxes = torch.cat([gt_bboxes, bboxes], dim=0)
assign_result.add_gt_(gt_labels)
gt_ones = bboxes.new_ones(gt_bboxes.shape[0], dtype=torch.uint8)
gt_flags = torch.cat([gt_ones, gt_flags])
num_expected_pos = int(self.num * self.pos_fraction)
pos_inds = self.pos_sampler._sample_pos(
assign_result, num_expected_pos, bboxes=bboxes, **kwargs)
# We found that sampled indices have duplicated items occasionally.
# (may be a bug of PyTorch)
pos_inds = pos_inds.unique()
num_sampled_pos = pos_inds.numel()
num_expected_neg = self.num - num_sampled_pos
if self.neg_pos_ub >= 0:
_pos = max(1, num_sampled_pos)
neg_upper_bound = int(self.neg_pos_ub * _pos)
if num_expected_neg > neg_upper_bound:
num_expected_neg = neg_upper_bound
neg_inds = self.neg_sampler._sample_neg(
assign_result, num_expected_neg, bboxes=bboxes, **kwargs)
neg_inds = neg_inds.unique()
sampling_result = SamplingResult(pos_inds, neg_inds, bboxes, gt_bboxes,
assign_result, gt_flags)
return sampling_result
from ..builder import BBOX_SAMPLERS, build_sampler
from .base_sampler import BaseSampler
@BBOX_SAMPLERS.register_module()
class CombinedSampler(BaseSampler):
"""A sampler that combines positive sampler and negative sampler."""
def __init__(self, pos_sampler, neg_sampler, **kwargs):
super(CombinedSampler, self).__init__(**kwargs)
self.pos_sampler = build_sampler(pos_sampler, **kwargs)
self.neg_sampler = build_sampler(neg_sampler, **kwargs)
def _sample_pos(self, **kwargs):
"""Sample positive samples."""
raise NotImplementedError
def _sample_neg(self, **kwargs):
"""Sample negative samples."""
raise NotImplementedError
import numpy as np
import torch
from ..builder import BBOX_SAMPLERS
from .random_sampler import RandomSampler
@BBOX_SAMPLERS.register_module()
class InstanceBalancedPosSampler(RandomSampler):
"""Instance balanced sampler that samples equal number of positive samples
for each instance."""
def _sample_pos(self, assign_result, num_expected, **kwargs):
"""Sample positive boxes.
Args:
assign_result (:obj:`AssignResult`): The assigned results of boxes.
num_expected (int): The number of expected positive samples
Returns:
Tensor or ndarray: sampled indices.
"""
pos_inds = torch.nonzero(assign_result.gt_inds > 0, as_tuple=False)
if pos_inds.numel() != 0:
pos_inds = pos_inds.squeeze(1)
if pos_inds.numel() <= num_expected:
return pos_inds
else:
unique_gt_inds = assign_result.gt_inds[pos_inds].unique()
num_gts = len(unique_gt_inds)
num_per_gt = int(round(num_expected / float(num_gts)) + 1)
sampled_inds = []
for i in unique_gt_inds:
inds = torch.nonzero(
assign_result.gt_inds == i.item(), as_tuple=False)
if inds.numel() != 0:
inds = inds.squeeze(1)
else:
continue
if len(inds) > num_per_gt:
inds = self.random_choice(inds, num_per_gt)
sampled_inds.append(inds)
sampled_inds = torch.cat(sampled_inds)
if len(sampled_inds) < num_expected:
num_extra = num_expected - len(sampled_inds)
extra_inds = np.array(
list(set(pos_inds.cpu()) - set(sampled_inds.cpu())))
if len(extra_inds) > num_extra:
extra_inds = self.random_choice(extra_inds, num_extra)
extra_inds = torch.from_numpy(extra_inds).to(
assign_result.gt_inds.device).long()
sampled_inds = torch.cat([sampled_inds, extra_inds])
elif len(sampled_inds) > num_expected:
sampled_inds = self.random_choice(sampled_inds, num_expected)
return sampled_inds
import numpy as np
import torch
from ..builder import BBOX_SAMPLERS
from .random_sampler import RandomSampler
@BBOX_SAMPLERS.register_module()
class IoUBalancedNegSampler(RandomSampler):
"""IoU Balanced Sampling.
arXiv: https://arxiv.org/pdf/1904.02701.pdf (CVPR 2019)
Sampling proposals according to their IoU. `floor_fraction` of needed RoIs
are sampled from proposals whose IoU are lower than `floor_thr` randomly.
The others are sampled from proposals whose IoU are higher than
`floor_thr`. These proposals are sampled from some bins evenly, which are
split by `num_bins` via IoU evenly.
Args:
num (int): number of proposals.
pos_fraction (float): fraction of positive proposals.
floor_thr (float): threshold (minimum) IoU for IoU balanced sampling,
set to -1 if all using IoU balanced sampling.
floor_fraction (float): sampling fraction of proposals under floor_thr.
num_bins (int): number of bins in IoU balanced sampling.
"""
def __init__(self,
num,
pos_fraction,
floor_thr=-1,
floor_fraction=0,
num_bins=3,
**kwargs):
super(IoUBalancedNegSampler, self).__init__(num, pos_fraction,
**kwargs)
assert floor_thr >= 0 or floor_thr == -1
assert 0 <= floor_fraction <= 1
assert num_bins >= 1
self.floor_thr = floor_thr
self.floor_fraction = floor_fraction
self.num_bins = num_bins
def sample_via_interval(self, max_overlaps, full_set, num_expected):
"""Sample according to the iou interval.
Args:
max_overlaps (torch.Tensor): IoU between bounding boxes and ground
truth boxes.
full_set (set(int)): A full set of indices of boxes。
num_expected (int): Number of expected samples。
Returns:
np.ndarray: Indices of samples
"""
max_iou = max_overlaps.max()
iou_interval = (max_iou - self.floor_thr) / self.num_bins
per_num_expected = int(num_expected / self.num_bins)
sampled_inds = []
for i in range(self.num_bins):
start_iou = self.floor_thr + i * iou_interval
end_iou = self.floor_thr + (i + 1) * iou_interval
tmp_set = set(
np.where(
np.logical_and(max_overlaps >= start_iou,
max_overlaps < end_iou))[0])
tmp_inds = list(tmp_set & full_set)
if len(tmp_inds) > per_num_expected:
tmp_sampled_set = self.random_choice(tmp_inds,
per_num_expected)
else:
tmp_sampled_set = np.array(tmp_inds, dtype=np.int)
sampled_inds.append(tmp_sampled_set)
sampled_inds = np.concatenate(sampled_inds)
if len(sampled_inds) < num_expected:
num_extra = num_expected - len(sampled_inds)
extra_inds = np.array(list(full_set - set(sampled_inds)))
if len(extra_inds) > num_extra:
extra_inds = self.random_choice(extra_inds, num_extra)
sampled_inds = np.concatenate([sampled_inds, extra_inds])
return sampled_inds
def _sample_neg(self, assign_result, num_expected, **kwargs):
"""Sample negative boxes.
Args:
assign_result (:obj:`AssignResult`): The assigned results of boxes.
num_expected (int): The number of expected negative samples
Returns:
Tensor or ndarray: sampled indices.
"""
neg_inds = torch.nonzero(assign_result.gt_inds == 0, as_tuple=False)
if neg_inds.numel() != 0:
neg_inds = neg_inds.squeeze(1)
if len(neg_inds) <= num_expected:
return neg_inds
else:
max_overlaps = assign_result.max_overlaps.cpu().numpy()
# balance sampling for negative samples
neg_set = set(neg_inds.cpu().numpy())
if self.floor_thr > 0:
floor_set = set(
np.where(
np.logical_and(max_overlaps >= 0,
max_overlaps < self.floor_thr))[0])
iou_sampling_set = set(
np.where(max_overlaps >= self.floor_thr)[0])
elif self.floor_thr == 0:
floor_set = set(np.where(max_overlaps == 0)[0])
iou_sampling_set = set(
np.where(max_overlaps > self.floor_thr)[0])
else:
floor_set = set()
iou_sampling_set = set(
np.where(max_overlaps > self.floor_thr)[0])
# for sampling interval calculation
self.floor_thr = 0
floor_neg_inds = list(floor_set & neg_set)
iou_sampling_neg_inds = list(iou_sampling_set & neg_set)
num_expected_iou_sampling = int(num_expected *
(1 - self.floor_fraction))
if len(iou_sampling_neg_inds) > num_expected_iou_sampling:
if self.num_bins >= 2:
iou_sampled_inds = self.sample_via_interval(
max_overlaps, set(iou_sampling_neg_inds),
num_expected_iou_sampling)
else:
iou_sampled_inds = self.random_choice(
iou_sampling_neg_inds, num_expected_iou_sampling)
else:
iou_sampled_inds = np.array(
iou_sampling_neg_inds, dtype=np.int)
num_expected_floor = num_expected - len(iou_sampled_inds)
if len(floor_neg_inds) > num_expected_floor:
sampled_floor_inds = self.random_choice(
floor_neg_inds, num_expected_floor)
else:
sampled_floor_inds = np.array(floor_neg_inds, dtype=np.int)
sampled_inds = np.concatenate(
(sampled_floor_inds, iou_sampled_inds))
if len(sampled_inds) < num_expected:
num_extra = num_expected - len(sampled_inds)
extra_inds = np.array(list(neg_set - set(sampled_inds)))
if len(extra_inds) > num_extra:
extra_inds = self.random_choice(extra_inds, num_extra)
sampled_inds = np.concatenate((sampled_inds, extra_inds))
sampled_inds = torch.from_numpy(sampled_inds).long().to(
assign_result.gt_inds.device)
return sampled_inds
import torch
from ..builder import BBOX_SAMPLERS
from ..transforms import bbox2roi
from .base_sampler import BaseSampler
@BBOX_SAMPLERS.register_module()
class OHEMSampler(BaseSampler):
r"""Online Hard Example Mining Sampler described in `Training Region-based
Object Detectors with Online Hard Example Mining
<https://arxiv.org/abs/1604.03540>`_.
"""
def __init__(self,
num,
pos_fraction,
context,
neg_pos_ub=-1,
add_gt_as_proposals=True,
**kwargs):
super(OHEMSampler, self).__init__(num, pos_fraction, neg_pos_ub,
add_gt_as_proposals)
self.context = context
if not hasattr(self.context, 'num_stages'):
self.bbox_head = self.context.bbox_head
else:
self.bbox_head = self.context.bbox_head[self.context.current_stage]
def hard_mining(self, inds, num_expected, bboxes, labels, feats):
with torch.no_grad():
rois = bbox2roi([bboxes])
if not hasattr(self.context, 'num_stages'):
bbox_results = self.context._bbox_forward(feats, rois)
else:
bbox_results = self.context._bbox_forward(
self.context.current_stage, feats, rois)
cls_score = bbox_results['cls_score']
loss = self.bbox_head.loss(
cls_score=cls_score,
bbox_pred=None,
rois=rois,
labels=labels,
label_weights=cls_score.new_ones(cls_score.size(0)),
bbox_targets=None,
bbox_weights=None,
reduction_override='none')['loss_cls']
_, topk_loss_inds = loss.topk(num_expected)
return inds[topk_loss_inds]
def _sample_pos(self,
assign_result,
num_expected,
bboxes=None,
feats=None,
**kwargs):
"""Sample positive boxes.
Args:
assign_result (:obj:`AssignResult`): Assigned results
num_expected (int): Number of expected positive samples
bboxes (torch.Tensor, optional): Boxes. Defaults to None.
feats (list[torch.Tensor], optional): Multi-level features.
Defaults to None.
Returns:
torch.Tensor: Indices of positive samples
"""
# Sample some hard positive samples
pos_inds = torch.nonzero(assign_result.gt_inds > 0, as_tuple=False)
if pos_inds.numel() != 0:
pos_inds = pos_inds.squeeze(1)
if pos_inds.numel() <= num_expected:
return pos_inds
else:
return self.hard_mining(pos_inds, num_expected, bboxes[pos_inds],
assign_result.labels[pos_inds], feats)
def _sample_neg(self,
assign_result,
num_expected,
bboxes=None,
feats=None,
**kwargs):
"""Sample negative boxes.
Args:
assign_result (:obj:`AssignResult`): Assigned results
num_expected (int): Number of expected negative samples
bboxes (torch.Tensor, optional): Boxes. Defaults to None.
feats (list[torch.Tensor], optional): Multi-level features.
Defaults to None.
Returns:
torch.Tensor: Indices of negative samples
"""
# Sample some hard negative samples
neg_inds = torch.nonzero(assign_result.gt_inds == 0, as_tuple=False)
if neg_inds.numel() != 0:
neg_inds = neg_inds.squeeze(1)
if len(neg_inds) <= num_expected:
return neg_inds
else:
neg_labels = assign_result.labels.new_empty(
neg_inds.size(0)).fill_(self.bbox_head.num_classes)
return self.hard_mining(neg_inds, num_expected, bboxes[neg_inds],
neg_labels, feats)
import torch
from ..builder import BBOX_SAMPLERS
from .base_sampler import BaseSampler
from .sampling_result import SamplingResult
@BBOX_SAMPLERS.register_module()
class PseudoSampler(BaseSampler):
"""A pseudo sampler that does not do sampling actually."""
def __init__(self, **kwargs):
pass
def _sample_pos(self, **kwargs):
"""Sample positive samples."""
raise NotImplementedError
def _sample_neg(self, **kwargs):
"""Sample negative samples."""
raise NotImplementedError
def sample(self, assign_result, bboxes, gt_bboxes, **kwargs):
"""Directly returns the positive and negative indices of samples.
Args:
assign_result (:obj:`AssignResult`): Assigned results
bboxes (torch.Tensor): Bounding boxes
gt_bboxes (torch.Tensor): Ground truth boxes
Returns:
:obj:`SamplingResult`: sampler results
"""
pos_inds = torch.nonzero(
assign_result.gt_inds > 0, as_tuple=False).squeeze(-1).unique()
neg_inds = torch.nonzero(
assign_result.gt_inds == 0, as_tuple=False).squeeze(-1).unique()
gt_flags = bboxes.new_zeros(bboxes.shape[0], dtype=torch.uint8)
sampling_result = SamplingResult(pos_inds, neg_inds, bboxes, gt_bboxes,
assign_result, gt_flags)
return sampling_result
import torch
from ..builder import BBOX_SAMPLERS
from .base_sampler import BaseSampler
@BBOX_SAMPLERS.register_module()
class RandomSampler(BaseSampler):
"""Random sampler.
Args:
num (int): Number of samples
pos_fraction (float): Fraction of positive samples
neg_pos_up (int, optional): Upper bound number of negative and
positive samples. Defaults to -1.
add_gt_as_proposals (bool, optional): Whether to add ground truth
boxes as proposals. Defaults to True.
"""
def __init__(self,
num,
pos_fraction,
neg_pos_ub=-1,
add_gt_as_proposals=True,
**kwargs):
from mmdet.core.bbox import demodata
super(RandomSampler, self).__init__(num, pos_fraction, neg_pos_ub,
add_gt_as_proposals)
self.rng = demodata.ensure_rng(kwargs.get('rng', None))
def random_choice(self, gallery, num):
"""Random select some elements from the gallery.
If `gallery` is a Tensor, the returned indices will be a Tensor;
If `gallery` is a ndarray or list, the returned indices will be a
ndarray.
Args:
gallery (Tensor | ndarray | list): indices pool.
num (int): expected sample num.
Returns:
Tensor or ndarray: sampled indices.
"""
assert len(gallery) >= num
is_tensor = isinstance(gallery, torch.Tensor)
if not is_tensor:
if torch.cuda.is_available():
device = torch.cuda.current_device()
else:
device = 'cpu'
gallery = torch.tensor(gallery, dtype=torch.long, device=device)
perm = torch.randperm(gallery.numel(), device=gallery.device)[:num]
rand_inds = gallery[perm]
if not is_tensor:
rand_inds = rand_inds.cpu().numpy()
return rand_inds
def _sample_pos(self, assign_result, num_expected, **kwargs):
"""Randomly sample some positive samples."""
pos_inds = torch.nonzero(assign_result.gt_inds > 0, as_tuple=False)
if pos_inds.numel() != 0:
pos_inds = pos_inds.squeeze(1)
if pos_inds.numel() <= num_expected:
return pos_inds
else:
return self.random_choice(pos_inds, num_expected)
def _sample_neg(self, assign_result, num_expected, **kwargs):
"""Randomly sample some negative samples."""
neg_inds = torch.nonzero(assign_result.gt_inds == 0, as_tuple=False)
if neg_inds.numel() != 0:
neg_inds = neg_inds.squeeze(1)
if len(neg_inds) <= num_expected:
return neg_inds
else:
return self.random_choice(neg_inds, num_expected)
import torch
from mmdet.utils import util_mixins
class SamplingResult(util_mixins.NiceRepr):
"""Bbox sampling result.
Example:
>>> # xdoctest: +IGNORE_WANT
>>> from mmdet.core.bbox.samplers.sampling_result import * # NOQA
>>> self = SamplingResult.random(rng=10)
>>> print(f'self = {self}')
self = <SamplingResult({
'neg_bboxes': torch.Size([12, 4]),
'neg_inds': tensor([ 0, 1, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12]),
'num_gts': 4,
'pos_assigned_gt_inds': tensor([], dtype=torch.int64),
'pos_bboxes': torch.Size([0, 4]),
'pos_inds': tensor([], dtype=torch.int64),
'pos_is_gt': tensor([], dtype=torch.uint8)
})>
"""
def __init__(self, pos_inds, neg_inds, bboxes, gt_bboxes, assign_result,
gt_flags):
self.pos_inds = pos_inds
self.neg_inds = neg_inds
self.pos_bboxes = bboxes[pos_inds]
self.neg_bboxes = bboxes[neg_inds]
self.pos_is_gt = gt_flags[pos_inds]
self.num_gts = gt_bboxes.shape[0]
self.pos_assigned_gt_inds = assign_result.gt_inds[pos_inds] - 1
if gt_bboxes.numel() == 0:
# hack for index error case
assert self.pos_assigned_gt_inds.numel() == 0
self.pos_gt_bboxes = torch.empty_like(gt_bboxes).view(-1, 4)
else:
if len(gt_bboxes.shape) < 2:
gt_bboxes = gt_bboxes.view(-1, 4)
self.pos_gt_bboxes = gt_bboxes[self.pos_assigned_gt_inds, :]
if assign_result.labels is not None:
self.pos_gt_labels = assign_result.labels[pos_inds]
else:
self.pos_gt_labels = None
@property
def bboxes(self):
"""torch.Tensor: concatenated positive and negative boxes"""
return torch.cat([self.pos_bboxes, self.neg_bboxes])
def to(self, device):
"""Change the device of the data inplace.
Example:
>>> self = SamplingResult.random()
>>> print(f'self = {self.to(None)}')
>>> # xdoctest: +REQUIRES(--gpu)
>>> print(f'self = {self.to(0)}')
"""
_dict = self.__dict__
for key, value in _dict.items():
if isinstance(value, torch.Tensor):
_dict[key] = value.to(device)
return self
def __nice__(self):
data = self.info.copy()
data['pos_bboxes'] = data.pop('pos_bboxes').shape
data['neg_bboxes'] = data.pop('neg_bboxes').shape
parts = [f"'{k}': {v!r}" for k, v in sorted(data.items())]
body = ' ' + ',\n '.join(parts)
return '{\n' + body + '\n}'
@property
def info(self):
"""Returns a dictionary of info about the object."""
return {
'pos_inds': self.pos_inds,
'neg_inds': self.neg_inds,
'pos_bboxes': self.pos_bboxes,
'neg_bboxes': self.neg_bboxes,
'pos_is_gt': self.pos_is_gt,
'num_gts': self.num_gts,
'pos_assigned_gt_inds': self.pos_assigned_gt_inds,
}
@classmethod
def random(cls, rng=None, **kwargs):
"""
Args:
rng (None | int | numpy.random.RandomState): seed or state.
kwargs (keyword arguments):
- num_preds: number of predicted boxes
- num_gts: number of true boxes
- p_ignore (float): probability of a predicted box assinged to \
an ignored truth.
- p_assigned (float): probability of a predicted box not being \
assigned.
- p_use_label (float | bool): with labels or not.
Returns:
:obj:`SamplingResult`: Randomly generated sampling result.
Example:
>>> from mmdet.core.bbox.samplers.sampling_result import * # NOQA
>>> self = SamplingResult.random()
>>> print(self.__dict__)
"""
from mmdet.core.bbox.samplers.random_sampler import RandomSampler
from mmdet.core.bbox.assigners.assign_result import AssignResult
from mmdet.core.bbox import demodata
rng = demodata.ensure_rng(rng)
# make probabalistic?
num = 32
pos_fraction = 0.5
neg_pos_ub = -1
assign_result = AssignResult.random(rng=rng, **kwargs)
# Note we could just compute an assignment
bboxes = demodata.random_boxes(assign_result.num_preds, rng=rng)
gt_bboxes = demodata.random_boxes(assign_result.num_gts, rng=rng)
if rng.rand() > 0.2:
# sometimes algorithms squeeze their data, be robust to that
gt_bboxes = gt_bboxes.squeeze()
bboxes = bboxes.squeeze()
if assign_result.labels is None:
gt_labels = None
else:
gt_labels = None # todo
if gt_labels is None:
add_gt_as_proposals = False
else:
add_gt_as_proposals = True # make probabalistic?
sampler = RandomSampler(
num,
pos_fraction,
neg_pos_ub=neg_pos_ub,
add_gt_as_proposals=add_gt_as_proposals,
rng=rng)
self = sampler.sample(assign_result, bboxes, gt_bboxes, gt_labels)
return self
import torch
from mmcv.ops import nms_match
from ..builder import BBOX_SAMPLERS
from ..transforms import bbox2roi
from .base_sampler import BaseSampler
from .sampling_result import SamplingResult
@BBOX_SAMPLERS.register_module()
class ScoreHLRSampler(BaseSampler):
r"""Importance-based Sample Reweighting (ISR_N), described in `Prime Sample
Attention in Object Detection <https://arxiv.org/abs/1904.04821>`_.
Score hierarchical local rank (HLR) differentiates with RandomSampler in
negative part. It firstly computes Score-HLR in a two-step way,
then linearly maps score hlr to the loss weights.
Args:
num (int): Total number of sampled RoIs.
pos_fraction (float): Fraction of positive samples.
context (:class:`BaseRoIHead`): RoI head that the sampler belongs to.
neg_pos_ub (int): Upper bound of the ratio of num negative to num
positive, -1 means no upper bound.
add_gt_as_proposals (bool): Whether to add ground truth as proposals.
k (float): Power of the non-linear mapping.
bias (float): Shift of the non-linear mapping.
score_thr (float): Minimum score that a negative sample is to be
considered as valid bbox.
"""
def __init__(self,
num,
pos_fraction,
context,
neg_pos_ub=-1,
add_gt_as_proposals=True,
k=0.5,
bias=0,
score_thr=0.05,
iou_thr=0.5,
**kwargs):
super().__init__(num, pos_fraction, neg_pos_ub, add_gt_as_proposals)
self.k = k
self.bias = bias
self.score_thr = score_thr
self.iou_thr = iou_thr
self.context = context
# context of cascade detectors is a list, so distinguish them here.
if not hasattr(context, 'num_stages'):
self.bbox_roi_extractor = context.bbox_roi_extractor
self.bbox_head = context.bbox_head
self.with_shared_head = context.with_shared_head
if self.with_shared_head:
self.shared_head = context.shared_head
else:
self.bbox_roi_extractor = context.bbox_roi_extractor[
context.current_stage]
self.bbox_head = context.bbox_head[context.current_stage]
@staticmethod
def random_choice(gallery, num):
"""Randomly select some elements from the gallery.
If `gallery` is a Tensor, the returned indices will be a Tensor;
If `gallery` is a ndarray or list, the returned indices will be a
ndarray.
Args:
gallery (Tensor | ndarray | list): indices pool.
num (int): expected sample num.
Returns:
Tensor or ndarray: sampled indices.
"""
assert len(gallery) >= num
is_tensor = isinstance(gallery, torch.Tensor)
if not is_tensor:
if torch.cuda.is_available():
device = torch.cuda.current_device()
else:
device = 'cpu'
gallery = torch.tensor(gallery, dtype=torch.long, device=device)
perm = torch.randperm(gallery.numel(), device=gallery.device)[:num]
rand_inds = gallery[perm]
if not is_tensor:
rand_inds = rand_inds.cpu().numpy()
return rand_inds
def _sample_pos(self, assign_result, num_expected, **kwargs):
"""Randomly sample some positive samples."""
pos_inds = torch.nonzero(assign_result.gt_inds > 0).flatten()
if pos_inds.numel() <= num_expected:
return pos_inds
else:
return self.random_choice(pos_inds, num_expected)
def _sample_neg(self,
assign_result,
num_expected,
bboxes,
feats=None,
img_meta=None,
**kwargs):
"""Sample negative samples.
Score-HLR sampler is done in the following steps:
1. Take the maximum positive score prediction of each negative samples
as s_i.
2. Filter out negative samples whose s_i <= score_thr, the left samples
are called valid samples.
3. Use NMS-Match to divide valid samples into different groups,
samples in the same group will greatly overlap with each other
4. Rank the matched samples in two-steps to get Score-HLR.
(1) In the same group, rank samples with their scores.
(2) In the same score rank across different groups,
rank samples with their scores again.
5. Linearly map Score-HLR to the final label weights.
Args:
assign_result (:obj:`AssignResult`): result of assigner.
num_expected (int): Expected number of samples.
bboxes (Tensor): bbox to be sampled.
feats (Tensor): Features come from FPN.
img_meta (dict): Meta information dictionary.
"""
neg_inds = torch.nonzero(assign_result.gt_inds == 0).flatten()
num_neg = neg_inds.size(0)
if num_neg == 0:
return neg_inds, None
with torch.no_grad():
neg_bboxes = bboxes[neg_inds]
neg_rois = bbox2roi([neg_bboxes])
bbox_result = self.context._bbox_forward(feats, neg_rois)
cls_score, bbox_pred = bbox_result['cls_score'], bbox_result[
'bbox_pred']
ori_loss = self.bbox_head.loss(
cls_score=cls_score,
bbox_pred=None,
rois=None,
labels=neg_inds.new_full((num_neg, ),
self.bbox_head.num_classes),
label_weights=cls_score.new_ones(num_neg),
bbox_targets=None,
bbox_weights=None,
reduction_override='none')['loss_cls']
# filter out samples with the max score lower than score_thr
max_score, argmax_score = cls_score.softmax(-1)[:, :-1].max(-1)
valid_inds = (max_score > self.score_thr).nonzero().view(-1)
invalid_inds = (max_score <= self.score_thr).nonzero().view(-1)
num_valid = valid_inds.size(0)
num_invalid = invalid_inds.size(0)
num_expected = min(num_neg, num_expected)
num_hlr = min(num_valid, num_expected)
num_rand = num_expected - num_hlr
if num_valid > 0:
valid_rois = neg_rois[valid_inds]
valid_max_score = max_score[valid_inds]
valid_argmax_score = argmax_score[valid_inds]
valid_bbox_pred = bbox_pred[valid_inds]
# valid_bbox_pred shape: [num_valid, #num_classes, 4]
valid_bbox_pred = valid_bbox_pred.view(
valid_bbox_pred.size(0), -1, 4)
selected_bbox_pred = valid_bbox_pred[range(num_valid),
valid_argmax_score]
pred_bboxes = self.bbox_head.bbox_coder.decode(
valid_rois[:, 1:], selected_bbox_pred)
pred_bboxes_with_score = torch.cat(
[pred_bboxes, valid_max_score[:, None]], -1)
group = nms_match(pred_bboxes_with_score, self.iou_thr)
# imp: importance
imp = cls_score.new_zeros(num_valid)
for g in group:
g_score = valid_max_score[g]
# g_score has already sorted
rank = g_score.new_tensor(range(g_score.size(0)))
imp[g] = num_valid - rank + g_score
_, imp_rank_inds = imp.sort(descending=True)
_, imp_rank = imp_rank_inds.sort()
hlr_inds = imp_rank_inds[:num_expected]
if num_rand > 0:
rand_inds = torch.randperm(num_invalid)[:num_rand]
select_inds = torch.cat(
[valid_inds[hlr_inds], invalid_inds[rand_inds]])
else:
select_inds = valid_inds[hlr_inds]
neg_label_weights = cls_score.new_ones(num_expected)
up_bound = max(num_expected, num_valid)
imp_weights = (up_bound -
imp_rank[hlr_inds].float()) / up_bound
neg_label_weights[:num_hlr] = imp_weights
neg_label_weights[num_hlr:] = imp_weights.min()
neg_label_weights = (self.bias +
(1 - self.bias) * neg_label_weights).pow(
self.k)
ori_selected_loss = ori_loss[select_inds]
new_loss = ori_selected_loss * neg_label_weights
norm_ratio = ori_selected_loss.sum() / new_loss.sum()
neg_label_weights *= norm_ratio
else:
neg_label_weights = cls_score.new_ones(num_expected)
select_inds = torch.randperm(num_neg)[:num_expected]
return neg_inds[select_inds], neg_label_weights
def sample(self,
assign_result,
bboxes,
gt_bboxes,
gt_labels=None,
img_meta=None,
**kwargs):
"""Sample positive and negative bboxes.
This is a simple implementation of bbox sampling given candidates,
assigning results and ground truth bboxes.
Args:
assign_result (:obj:`AssignResult`): Bbox assigning results.
bboxes (Tensor): Boxes to be sampled from.
gt_bboxes (Tensor): Ground truth bboxes.
gt_labels (Tensor, optional): Class labels of ground truth bboxes.
Returns:
tuple[:obj:`SamplingResult`, Tensor]: Sampling result and negetive
label weights.
"""
bboxes = bboxes[:, :4]
gt_flags = bboxes.new_zeros((bboxes.shape[0], ), dtype=torch.uint8)
if self.add_gt_as_proposals:
bboxes = torch.cat([gt_bboxes, bboxes], dim=0)
assign_result.add_gt_(gt_labels)
gt_ones = bboxes.new_ones(gt_bboxes.shape[0], dtype=torch.uint8)
gt_flags = torch.cat([gt_ones, gt_flags])
num_expected_pos = int(self.num * self.pos_fraction)
pos_inds = self.pos_sampler._sample_pos(
assign_result, num_expected_pos, bboxes=bboxes, **kwargs)
num_sampled_pos = pos_inds.numel()
num_expected_neg = self.num - num_sampled_pos
if self.neg_pos_ub >= 0:
_pos = max(1, num_sampled_pos)
neg_upper_bound = int(self.neg_pos_ub * _pos)
if num_expected_neg > neg_upper_bound:
num_expected_neg = neg_upper_bound
neg_inds, neg_label_weights = self.neg_sampler._sample_neg(
assign_result,
num_expected_neg,
bboxes,
img_meta=img_meta,
**kwargs)
return SamplingResult(pos_inds, neg_inds, bboxes, gt_bboxes,
assign_result, gt_flags), neg_label_weights
import numpy as np
import torch
def bbox_flip(bboxes, img_shape, direction='horizontal'):
"""Flip bboxes horizontally or vertically.
Args:
bboxes (Tensor): Shape (..., 4*k)
img_shape (tuple): Image shape.
direction (str): Flip direction, options are "horizontal", "vertical",
"diagonal". Default: "horizontal"
Returns:
Tensor: Flipped bboxes.
"""
assert bboxes.shape[-1] % 4 == 0
assert direction in ['horizontal', 'vertical', 'diagonal']
flipped = bboxes.clone()
if direction == 'horizontal':
flipped[..., 0::4] = img_shape[1] - bboxes[..., 2::4]
flipped[..., 2::4] = img_shape[1] - bboxes[..., 0::4]
elif direction == 'vertical':
flipped[..., 1::4] = img_shape[0] - bboxes[..., 3::4]
flipped[..., 3::4] = img_shape[0] - bboxes[..., 1::4]
else:
flipped[..., 0::4] = img_shape[1] - bboxes[..., 2::4]
flipped[..., 1::4] = img_shape[0] - bboxes[..., 3::4]
flipped[..., 2::4] = img_shape[1] - bboxes[..., 0::4]
flipped[..., 3::4] = img_shape[0] - bboxes[..., 1::4]
return flipped
def bbox_mapping(bboxes,
img_shape,
scale_factor,
flip,
flip_direction='horizontal'):
"""Map bboxes from the original image scale to testing scale."""
new_bboxes = bboxes * bboxes.new_tensor(scale_factor)
if flip:
new_bboxes = bbox_flip(new_bboxes, img_shape, flip_direction)
return new_bboxes
def bbox_mapping_back(bboxes,
img_shape,
scale_factor,
flip,
flip_direction='horizontal'):
"""Map bboxes from testing scale to original image scale."""
new_bboxes = bbox_flip(bboxes, img_shape,
flip_direction) if flip else bboxes
new_bboxes = new_bboxes.view(-1, 4) / new_bboxes.new_tensor(scale_factor)
return new_bboxes.view(bboxes.shape)
def bbox2roi(bbox_list):
"""Convert a list of bboxes to roi format.
Args:
bbox_list (list[Tensor]): a list of bboxes corresponding to a batch
of images.
Returns:
Tensor: shape (n, 5), [batch_ind, x1, y1, x2, y2]
"""
rois_list = []
for img_id, bboxes in enumerate(bbox_list):
if bboxes.size(0) > 0:
img_inds = bboxes.new_full((bboxes.size(0), 1), img_id)
rois = torch.cat([img_inds, bboxes[:, :4]], dim=-1)
else:
rois = bboxes.new_zeros((0, 5))
rois_list.append(rois)
rois = torch.cat(rois_list, 0)
return rois
def roi2bbox(rois):
"""Convert rois to bounding box format.
Args:
rois (torch.Tensor): RoIs with the shape (n, 5) where the first
column indicates batch id of each RoI.
Returns:
list[torch.Tensor]: Converted boxes of corresponding rois.
"""
bbox_list = []
img_ids = torch.unique(rois[:, 0].cpu(), sorted=True)
for img_id in img_ids:
inds = (rois[:, 0] == img_id.item())
bbox = rois[inds, 1:]
bbox_list.append(bbox)
return bbox_list
def bbox2result(bboxes, labels, num_classes):
"""Convert detection results to a list of numpy arrays.
Args:
bboxes (torch.Tensor | np.ndarray): shape (n, 5)
labels (torch.Tensor | np.ndarray): shape (n, )
num_classes (int): class number, including background class
Returns:
list(ndarray): bbox results of each class
"""
if bboxes.shape[0] == 0:
return [np.zeros((0, 5), dtype=np.float32) for i in range(num_classes)]
else:
if isinstance(bboxes, torch.Tensor):
bboxes = bboxes.detach().cpu().numpy()
labels = labels.detach().cpu().numpy()
return [bboxes[labels == i, :] for i in range(num_classes)]
def distance2bbox(points, distance, max_shape=None):
"""Decode distance prediction to bounding box.
Args:
points (Tensor): Shape (n, 2), [x, y].
distance (Tensor): Distance from the given point to 4
boundaries (left, top, right, bottom).
max_shape (tuple): Shape of the image.
Returns:
Tensor: Decoded bboxes.
"""
x1 = points[:, 0] - distance[:, 0]
y1 = points[:, 1] - distance[:, 1]
x2 = points[:, 0] + distance[:, 2]
y2 = points[:, 1] + distance[:, 3]
if max_shape is not None:
x1 = x1.clamp(min=0, max=max_shape[1])
y1 = y1.clamp(min=0, max=max_shape[0])
x2 = x2.clamp(min=0, max=max_shape[1])
y2 = y2.clamp(min=0, max=max_shape[0])
return torch.stack([x1, y1, x2, y2], -1)
def bbox2distance(points, bbox, max_dis=None, eps=0.1):
"""Decode bounding box based on distances.
Args:
points (Tensor): Shape (n, 2), [x, y].
bbox (Tensor): Shape (n, 4), "xyxy" format
max_dis (float): Upper bound of the distance.
eps (float): a small value to ensure target < max_dis, instead <=
Returns:
Tensor: Decoded distances.
"""
left = points[:, 0] - bbox[:, 0]
top = points[:, 1] - bbox[:, 1]
right = bbox[:, 2] - points[:, 0]
bottom = bbox[:, 3] - points[:, 1]
if max_dis is not None:
left = left.clamp(min=0, max=max_dis - eps)
top = top.clamp(min=0, max=max_dis - eps)
right = right.clamp(min=0, max=max_dis - eps)
bottom = bottom.clamp(min=0, max=max_dis - eps)
return torch.stack([left, top, right, bottom], -1)
def bbox_rescale(bboxes, scale_factor=1.0):
"""Rescale bounding box w.r.t. scale_factor.
Args:
bboxes (Tensor): Shape (n, 4) for bboxes or (n, 5) for rois
scale_factor (float): rescale factor
Returns:
Tensor: Rescaled bboxes.
"""
if bboxes.size(1) == 5:
bboxes_ = bboxes[:, 1:]
inds_ = bboxes[:, 0]
else:
bboxes_ = bboxes
cx = (bboxes_[:, 0] + bboxes_[:, 2]) * 0.5
cy = (bboxes_[:, 1] + bboxes_[:, 3]) * 0.5
w = bboxes_[:, 2] - bboxes_[:, 0]
h = bboxes_[:, 3] - bboxes_[:, 1]
w = w * scale_factor
h = h * scale_factor
x1 = cx - 0.5 * w
x2 = cx + 0.5 * w
y1 = cy - 0.5 * h
y2 = cy + 0.5 * h
if bboxes.size(1) == 5:
rescaled_bboxes = torch.stack([inds_, x1, y1, x2, y2], dim=-1)
else:
rescaled_bboxes = torch.stack([x1, y1, x2, y2], dim=-1)
return rescaled_bboxes
def bbox_cxcywh_to_xyxy(bbox):
"""Convert bbox coordinates from (cx, cy, w, h) to (x1, y1, x2, y2).
Args:
bbox (Tensor): Shape (n, 4) for bboxes.
Returns:
Tensor: Converted bboxes.
"""
cx, cy, w, h = bbox.split((1, 1, 1, 1), dim=-1)
bbox_new = [(cx - 0.5 * w), (cy - 0.5 * h), (cx + 0.5 * w), (cy + 0.5 * h)]
return torch.cat(bbox_new, dim=-1)
def bbox_xyxy_to_cxcywh(bbox):
"""Convert bbox coordinates from (x1, y1, x2, y2) to (cx, cy, w, h).
Args:
bbox (Tensor): Shape (n, 4) for bboxes.
Returns:
Tensor: Converted bboxes.
"""
x1, y1, x2, y2 = bbox.split((1, 1, 1, 1), dim=-1)
bbox_new = [(x1 + x2) / 2, (y1 + y2) / 2, (x2 - x1), (y2 - y1)]
return torch.cat(bbox_new, dim=-1)
from .class_names import (cityscapes_classes, coco_classes, dataset_aliases,
get_classes, imagenet_det_classes,
imagenet_vid_classes, voc_classes)
from .eval_hooks import DistEvalHook, EvalHook
from .mean_ap import average_precision, eval_map, print_map_summary
from .recall import (eval_recalls, plot_iou_recall, plot_num_recall,
print_recall_summary)
__all__ = [
'voc_classes', 'imagenet_det_classes', 'imagenet_vid_classes',
'coco_classes', 'cityscapes_classes', 'dataset_aliases', 'get_classes',
'DistEvalHook', 'EvalHook', 'average_precision', 'eval_map',
'print_map_summary', 'eval_recalls', 'print_recall_summary',
'plot_num_recall', 'plot_iou_recall'
]
import numpy as np
def bbox_overlaps(bboxes1, bboxes2, mode='iou', eps=1e-6):
"""Calculate the ious between each bbox of bboxes1 and bboxes2.
Args:
bboxes1(ndarray): shape (n, 4)
bboxes2(ndarray): shape (k, 4)
mode(str): iou (intersection over union) or iof (intersection
over foreground)
Returns:
ious(ndarray): shape (n, k)
"""
assert mode in ['iou', 'iof']
bboxes1 = bboxes1.astype(np.float32)
bboxes2 = bboxes2.astype(np.float32)
rows = bboxes1.shape[0]
cols = bboxes2.shape[0]
ious = np.zeros((rows, cols), dtype=np.float32)
if rows * cols == 0:
return ious
exchange = False
if bboxes1.shape[0] > bboxes2.shape[0]:
bboxes1, bboxes2 = bboxes2, bboxes1
ious = np.zeros((cols, rows), dtype=np.float32)
exchange = True
area1 = (bboxes1[:, 2] - bboxes1[:, 0]) * (bboxes1[:, 3] - bboxes1[:, 1])
area2 = (bboxes2[:, 2] - bboxes2[:, 0]) * (bboxes2[:, 3] - bboxes2[:, 1])
for i in range(bboxes1.shape[0]):
x_start = np.maximum(bboxes1[i, 0], bboxes2[:, 0])
y_start = np.maximum(bboxes1[i, 1], bboxes2[:, 1])
x_end = np.minimum(bboxes1[i, 2], bboxes2[:, 2])
y_end = np.minimum(bboxes1[i, 3], bboxes2[:, 3])
overlap = np.maximum(x_end - x_start, 0) * np.maximum(
y_end - y_start, 0)
if mode == 'iou':
union = area1[i] + area2 - overlap
else:
union = area1[i] if not exchange else area2
union = np.maximum(union, eps)
ious[i, :] = overlap / union
if exchange:
ious = ious.T
return ious
import mmcv
def wider_face_classes():
return ['face']
def voc_classes():
return [
'aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 'cat',
'chair', 'cow', 'diningtable', 'dog', 'horse', 'motorbike', 'person',
'pottedplant', 'sheep', 'sofa', 'train', 'tvmonitor'
]
def imagenet_det_classes():
return [
'accordion', 'airplane', 'ant', 'antelope', 'apple', 'armadillo',
'artichoke', 'axe', 'baby_bed', 'backpack', 'bagel', 'balance_beam',
'banana', 'band_aid', 'banjo', 'baseball', 'basketball', 'bathing_cap',
'beaker', 'bear', 'bee', 'bell_pepper', 'bench', 'bicycle', 'binder',
'bird', 'bookshelf', 'bow_tie', 'bow', 'bowl', 'brassiere', 'burrito',
'bus', 'butterfly', 'camel', 'can_opener', 'car', 'cart', 'cattle',
'cello', 'centipede', 'chain_saw', 'chair', 'chime', 'cocktail_shaker',
'coffee_maker', 'computer_keyboard', 'computer_mouse', 'corkscrew',
'cream', 'croquet_ball', 'crutch', 'cucumber', 'cup_or_mug', 'diaper',
'digital_clock', 'dishwasher', 'dog', 'domestic_cat', 'dragonfly',
'drum', 'dumbbell', 'electric_fan', 'elephant', 'face_powder', 'fig',
'filing_cabinet', 'flower_pot', 'flute', 'fox', 'french_horn', 'frog',
'frying_pan', 'giant_panda', 'goldfish', 'golf_ball', 'golfcart',
'guacamole', 'guitar', 'hair_dryer', 'hair_spray', 'hamburger',
'hammer', 'hamster', 'harmonica', 'harp', 'hat_with_a_wide_brim',
'head_cabbage', 'helmet', 'hippopotamus', 'horizontal_bar', 'horse',
'hotdog', 'iPod', 'isopod', 'jellyfish', 'koala_bear', 'ladle',
'ladybug', 'lamp', 'laptop', 'lemon', 'lion', 'lipstick', 'lizard',
'lobster', 'maillot', 'maraca', 'microphone', 'microwave', 'milk_can',
'miniskirt', 'monkey', 'motorcycle', 'mushroom', 'nail', 'neck_brace',
'oboe', 'orange', 'otter', 'pencil_box', 'pencil_sharpener', 'perfume',
'person', 'piano', 'pineapple', 'ping-pong_ball', 'pitcher', 'pizza',
'plastic_bag', 'plate_rack', 'pomegranate', 'popsicle', 'porcupine',
'power_drill', 'pretzel', 'printer', 'puck', 'punching_bag', 'purse',
'rabbit', 'racket', 'ray', 'red_panda', 'refrigerator',
'remote_control', 'rubber_eraser', 'rugby_ball', 'ruler',
'salt_or_pepper_shaker', 'saxophone', 'scorpion', 'screwdriver',
'seal', 'sheep', 'ski', 'skunk', 'snail', 'snake', 'snowmobile',
'snowplow', 'soap_dispenser', 'soccer_ball', 'sofa', 'spatula',
'squirrel', 'starfish', 'stethoscope', 'stove', 'strainer',
'strawberry', 'stretcher', 'sunglasses', 'swimming_trunks', 'swine',
'syringe', 'table', 'tape_player', 'tennis_ball', 'tick', 'tie',
'tiger', 'toaster', 'traffic_light', 'train', 'trombone', 'trumpet',
'turtle', 'tv_or_monitor', 'unicycle', 'vacuum', 'violin',
'volleyball', 'waffle_iron', 'washer', 'water_bottle', 'watercraft',
'whale', 'wine_bottle', 'zebra'
]
def imagenet_vid_classes():
return [
'airplane', 'antelope', 'bear', 'bicycle', 'bird', 'bus', 'car',
'cattle', 'dog', 'domestic_cat', 'elephant', 'fox', 'giant_panda',
'hamster', 'horse', 'lion', 'lizard', 'monkey', 'motorcycle', 'rabbit',
'red_panda', 'sheep', 'snake', 'squirrel', 'tiger', 'train', 'turtle',
'watercraft', 'whale', 'zebra'
]
def coco_classes():
return [
'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train',
'truck', 'boat', 'traffic_light', 'fire_hydrant', 'stop_sign',
'parking_meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep',
'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella',
'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard',
'sports_ball', 'kite', 'baseball_bat', 'baseball_glove', 'skateboard',
'surfboard', 'tennis_racket', 'bottle', 'wine_glass', 'cup', 'fork',
'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange',
'broccoli', 'carrot', 'hot_dog', 'pizza', 'donut', 'cake', 'chair',
'couch', 'potted_plant', 'bed', 'dining_table', 'toilet', 'tv',
'laptop', 'mouse', 'remote', 'keyboard', 'cell_phone', 'microwave',
'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase',
'scissors', 'teddy_bear', 'hair_drier', 'toothbrush'
]
def cityscapes_classes():
return [
'person', 'rider', 'car', 'truck', 'bus', 'train', 'motorcycle',
'bicycle'
]
dataset_aliases = {
'voc': ['voc', 'pascal_voc', 'voc07', 'voc12'],
'imagenet_det': ['det', 'imagenet_det', 'ilsvrc_det'],
'imagenet_vid': ['vid', 'imagenet_vid', 'ilsvrc_vid'],
'coco': ['coco', 'mscoco', 'ms_coco'],
'wider_face': ['WIDERFaceDataset', 'wider_face', 'WDIERFace'],
'cityscapes': ['cityscapes']
}
def get_classes(dataset):
"""Get class names of a dataset."""
alias2name = {}
for name, aliases in dataset_aliases.items():
for alias in aliases:
alias2name[alias] = name
if mmcv.is_str(dataset):
if dataset in alias2name:
labels = eval(alias2name[dataset] + '_classes()')
else:
raise ValueError(f'Unrecognized dataset: {dataset}')
else:
raise TypeError(f'dataset must a str, but got {type(dataset)}')
return labels
import os.path as osp
import warnings
from math import inf
import mmcv
from mmcv.runner import Hook
from torch.utils.data import DataLoader
from mmdet.utils import get_root_logger
class EvalHook(Hook):
"""Evaluation hook.
Notes:
If new arguments are added for EvalHook, tools/test.py,
tools/analysis_tools/eval_metric.py may be effected.
Attributes:
dataloader (DataLoader): A PyTorch dataloader.
start (int, optional): Evaluation starting epoch. It enables evaluation
before the training starts if ``start`` <= the resuming epoch.
If None, whether to evaluate is merely decided by ``interval``.
Default: None.
interval (int): Evaluation interval (by epochs). Default: 1.
save_best (str, optional): If a metric is specified, it would measure
the best checkpoint during evaluation. The information about best
checkpoint would be save in best.json.
Options are the evaluation metrics to the test dataset. e.g.,
``bbox_mAP``, ``segm_mAP`` for bbox detection and instance
segmentation. ``AR@100`` for proposal recall. If ``save_best`` is
``auto``, the first key will be used. The interval of
``CheckpointHook`` should device EvalHook. Default: None.
rule (str, optional): Comparison rule for best score. If set to None,
it will infer a reasonable rule. Keys such as 'mAP' or 'AR' will
be inferred by 'greater' rule. Keys contain 'loss' will be inferred
by 'less' rule. Options are 'greater', 'less'. Default: None.
**eval_kwargs: Evaluation arguments fed into the evaluate function of
the dataset.
"""
rule_map = {'greater': lambda x, y: x > y, 'less': lambda x, y: x < y}
init_value_map = {'greater': -inf, 'less': inf}
greater_keys = ['mAP', 'AR']
less_keys = ['loss']
def __init__(self,
dataloader,
start=None,
interval=1,
save_best=None,
rule=None,
**eval_kwargs):
if not isinstance(dataloader, DataLoader):
raise TypeError('dataloader must be a pytorch DataLoader, but got'
f' {type(dataloader)}')
if not interval > 0:
raise ValueError(f'interval must be positive, but got {interval}')
if start is not None and start < 0:
warnings.warn(
f'The evaluation start epoch {start} is smaller than 0, '
f'use 0 instead', UserWarning)
start = 0
self.dataloader = dataloader
self.interval = interval
self.start = start
assert isinstance(save_best, str) or save_best is None
self.save_best = save_best
self.eval_kwargs = eval_kwargs
self.initial_epoch_flag = True
self.logger = get_root_logger()
if self.save_best is not None:
self._init_rule(rule, self.save_best)
def _init_rule(self, rule, key_indicator):
"""Initialize rule, key_indicator, comparison_func, and best score.
Args:
rule (str | None): Comparison rule for best score.
key_indicator (str | None): Key indicator to determine the
comparison rule.
"""
if rule not in self.rule_map and rule is not None:
raise KeyError(f'rule must be greater, less or None, '
f'but got {rule}.')
if rule is None:
if key_indicator != 'auto':
if any(key in key_indicator for key in self.greater_keys):
rule = 'greater'
elif any(key in key_indicator for key in self.less_keys):
rule = 'less'
else:
raise ValueError(f'Cannot infer the rule for key '
f'{key_indicator}, thus a specific rule '
f'must be specified.')
self.rule = rule
self.key_indicator = key_indicator
if self.rule is not None:
self.compare_func = self.rule_map[self.rule]
def before_run(self, runner):
if self.save_best is not None:
if runner.meta is None:
warnings.warn('runner.meta is None. Creating a empty one.')
runner.meta = dict()
runner.meta.setdefault('hook_msgs', dict())
def before_train_epoch(self, runner):
"""Evaluate the model only at the start of training."""
if not self.initial_epoch_flag:
return
if self.start is not None and runner.epoch >= self.start:
self.after_train_epoch(runner)
self.initial_epoch_flag = False
def evaluation_flag(self, runner):
"""Judge whether to perform_evaluation after this epoch.
Returns:
bool: The flag indicating whether to perform evaluation.
"""
if self.start is None:
if not self.every_n_epochs(runner, self.interval):
# No evaluation during the interval epochs.
return False
elif (runner.epoch + 1) < self.start:
# No evaluation if start is larger than the current epoch.
return False
else:
# Evaluation only at epochs 3, 5, 7... if start==3 and interval==2
if (runner.epoch + 1 - self.start) % self.interval:
return False
return True
def after_train_epoch(self, runner):
if not self.evaluation_flag(runner):
return
from mmdet.apis import single_gpu_test
results = single_gpu_test(runner.model, self.dataloader, show=False)
key_score = self.evaluate(runner, results)
if self.save_best:
best_score = runner.meta['hook_msgs'].get(
'best_score', self.init_value_map[self.rule])
if self.compare_func(key_score, best_score):
best_score = key_score
runner.meta['hook_msgs']['best_score'] = best_score
last_ckpt = runner.meta['hook_msgs']['last_ckpt']
runner.meta['hook_msgs']['best_ckpt'] = last_ckpt
mmcv.symlink(
last_ckpt,
osp.join(runner.work_dir,
f'best_{self.key_indicator}.pth'))
self.logger.info(
f'Now best checkpoint is epoch_{runner.epoch + 1}.pth.'
f'Best {self.key_indicator} is {best_score:0.4f}')
def evaluate(self, runner, results):
eval_res = self.dataloader.dataset.evaluate(
results, logger=runner.logger, **self.eval_kwargs)
for name, val in eval_res.items():
runner.log_buffer.output[name] = val
runner.log_buffer.ready = True
if self.save_best is not None:
if self.key_indicator == 'auto':
# infer from eval_results
self._init_rule(self.rule, list(eval_res.keys())[0])
return eval_res[self.key_indicator]
else:
return None
class DistEvalHook(EvalHook):
"""Distributed evaluation hook.
Notes:
If new arguments are added, tools/test.py may be effected.
Attributes:
dataloader (DataLoader): A PyTorch dataloader.
start (int, optional): Evaluation starting epoch. It enables evaluation
before the training starts if ``start`` <= the resuming epoch.
If None, whether to evaluate is merely decided by ``interval``.
Default: None.
interval (int): Evaluation interval (by epochs). Default: 1.
tmpdir (str | None): Temporary directory to save the results of all
processes. Default: None.
gpu_collect (bool): Whether to use gpu or cpu to collect results.
Default: False.
save_best (str, optional): If a metric is specified, it would measure
the best checkpoint during evaluation. The information about best
checkpoint would be save in best.json.
Options are the evaluation metrics to the test dataset. e.g.,
``bbox_mAP``, ``segm_mAP`` for bbox detection and instance
segmentation. ``AR@100`` for proposal recall. If ``save_best`` is
``auto``, the first key will be used. The interval of
``CheckpointHook`` should device EvalHook. Default: None.
rule (str | None): Comparison rule for best score. If set to None,
it will infer a reasonable rule. Default: 'None'.
**eval_kwargs: Evaluation arguments fed into the evaluate function of
the dataset.
"""
def __init__(self,
dataloader,
start=None,
interval=1,
tmpdir=None,
gpu_collect=False,
save_best=None,
rule=None,
**eval_kwargs):
super().__init__(
dataloader,
start=start,
interval=interval,
save_best=save_best,
rule=rule,
**eval_kwargs)
self.tmpdir = tmpdir
self.gpu_collect = gpu_collect
def after_train_epoch(self, runner):
if not self.evaluation_flag(runner):
return
from mmdet.apis import multi_gpu_test
tmpdir = self.tmpdir
if tmpdir is None:
tmpdir = osp.join(runner.work_dir, '.eval_hook')
results = multi_gpu_test(
runner.model,
self.dataloader,
tmpdir=tmpdir,
gpu_collect=self.gpu_collect)
if runner.rank == 0:
print('\n')
key_score = self.evaluate(runner, results)
if self.save_best:
best_score = runner.meta['hook_msgs'].get(
'best_score', self.init_value_map[self.rule])
if self.compare_func(key_score, best_score):
best_score = key_score
runner.meta['hook_msgs']['best_score'] = best_score
last_ckpt = runner.meta['hook_msgs']['last_ckpt']
runner.meta['hook_msgs']['best_ckpt'] = last_ckpt
mmcv.symlink(
last_ckpt,
osp.join(runner.work_dir,
f'best_{self.key_indicator}.pth'))
self.logger.info(
f'Now best checkpoint is {last_ckpt}.'
f'Best {self.key_indicator} is {best_score:0.4f}')
from multiprocessing import Pool
import mmcv
import numpy as np
from mmcv.utils import print_log
from terminaltables import AsciiTable
from .bbox_overlaps import bbox_overlaps
from .class_names import get_classes
def average_precision(recalls, precisions, mode='area'):
"""Calculate average precision (for single or multiple scales).
Args:
recalls (ndarray): shape (num_scales, num_dets) or (num_dets, )
precisions (ndarray): shape (num_scales, num_dets) or (num_dets, )
mode (str): 'area' or '11points', 'area' means calculating the area
under precision-recall curve, '11points' means calculating
the average precision of recalls at [0, 0.1, ..., 1]
Returns:
float or ndarray: calculated average precision
"""
no_scale = False
if recalls.ndim == 1:
no_scale = True
recalls = recalls[np.newaxis, :]
precisions = precisions[np.newaxis, :]
assert recalls.shape == precisions.shape and recalls.ndim == 2
num_scales = recalls.shape[0]
ap = np.zeros(num_scales, dtype=np.float32)
if mode == 'area':
zeros = np.zeros((num_scales, 1), dtype=recalls.dtype)
ones = np.ones((num_scales, 1), dtype=recalls.dtype)
mrec = np.hstack((zeros, recalls, ones))
mpre = np.hstack((zeros, precisions, zeros))
for i in range(mpre.shape[1] - 1, 0, -1):
mpre[:, i - 1] = np.maximum(mpre[:, i - 1], mpre[:, i])
for i in range(num_scales):
ind = np.where(mrec[i, 1:] != mrec[i, :-1])[0]
ap[i] = np.sum(
(mrec[i, ind + 1] - mrec[i, ind]) * mpre[i, ind + 1])
elif mode == '11points':
for i in range(num_scales):
for thr in np.arange(0, 1 + 1e-3, 0.1):
precs = precisions[i, recalls[i, :] >= thr]
prec = precs.max() if precs.size > 0 else 0
ap[i] += prec
ap /= 11
else:
raise ValueError(
'Unrecognized mode, only "area" and "11points" are supported')
if no_scale:
ap = ap[0]
return ap
def tpfp_imagenet(det_bboxes,
gt_bboxes,
gt_bboxes_ignore=None,
default_iou_thr=0.5,
area_ranges=None):
"""Check if detected bboxes are true positive or false positive.
Args:
det_bbox (ndarray): Detected bboxes of this image, of shape (m, 5).
gt_bboxes (ndarray): GT bboxes of this image, of shape (n, 4).
gt_bboxes_ignore (ndarray): Ignored gt bboxes of this image,
of shape (k, 4). Default: None
default_iou_thr (float): IoU threshold to be considered as matched for
medium and large bboxes (small ones have special rules).
Default: 0.5.
area_ranges (list[tuple] | None): Range of bbox areas to be evaluated,
in the format [(min1, max1), (min2, max2), ...]. Default: None.
Returns:
tuple[np.ndarray]: (tp, fp) whose elements are 0 and 1. The shape of
each array is (num_scales, m).
"""
# an indicator of ignored gts
gt_ignore_inds = np.concatenate(
(np.zeros(gt_bboxes.shape[0], dtype=np.bool),
np.ones(gt_bboxes_ignore.shape[0], dtype=np.bool)))
# stack gt_bboxes and gt_bboxes_ignore for convenience
gt_bboxes = np.vstack((gt_bboxes, gt_bboxes_ignore))
num_dets = det_bboxes.shape[0]
num_gts = gt_bboxes.shape[0]
if area_ranges is None:
area_ranges = [(None, None)]
num_scales = len(area_ranges)
# tp and fp are of shape (num_scales, num_gts), each row is tp or fp
# of a certain scale.
tp = np.zeros((num_scales, num_dets), dtype=np.float32)
fp = np.zeros((num_scales, num_dets), dtype=np.float32)
if gt_bboxes.shape[0] == 0:
if area_ranges == [(None, None)]:
fp[...] = 1
else:
det_areas = (det_bboxes[:, 2] - det_bboxes[:, 0]) * (
det_bboxes[:, 3] - det_bboxes[:, 1])
for i, (min_area, max_area) in enumerate(area_ranges):
fp[i, (det_areas >= min_area) & (det_areas < max_area)] = 1
return tp, fp
ious = bbox_overlaps(det_bboxes, gt_bboxes - 1)
gt_w = gt_bboxes[:, 2] - gt_bboxes[:, 0]
gt_h = gt_bboxes[:, 3] - gt_bboxes[:, 1]
iou_thrs = np.minimum((gt_w * gt_h) / ((gt_w + 10.0) * (gt_h + 10.0)),
default_iou_thr)
# sort all detections by scores in descending order
sort_inds = np.argsort(-det_bboxes[:, -1])
for k, (min_area, max_area) in enumerate(area_ranges):
gt_covered = np.zeros(num_gts, dtype=bool)
# if no area range is specified, gt_area_ignore is all False
if min_area is None:
gt_area_ignore = np.zeros_like(gt_ignore_inds, dtype=bool)
else:
gt_areas = gt_w * gt_h
gt_area_ignore = (gt_areas < min_area) | (gt_areas >= max_area)
for i in sort_inds:
max_iou = -1
matched_gt = -1
# find best overlapped available gt
for j in range(num_gts):
# different from PASCAL VOC: allow finding other gts if the
# best overlaped ones are already matched by other det bboxes
if gt_covered[j]:
continue
elif ious[i, j] >= iou_thrs[j] and ious[i, j] > max_iou:
max_iou = ious[i, j]
matched_gt = j
# there are 4 cases for a det bbox:
# 1. it matches a gt, tp = 1, fp = 0
# 2. it matches an ignored gt, tp = 0, fp = 0
# 3. it matches no gt and within area range, tp = 0, fp = 1
# 4. it matches no gt but is beyond area range, tp = 0, fp = 0
if matched_gt >= 0:
gt_covered[matched_gt] = 1
if not (gt_ignore_inds[matched_gt]
or gt_area_ignore[matched_gt]):
tp[k, i] = 1
elif min_area is None:
fp[k, i] = 1
else:
bbox = det_bboxes[i, :4]
area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
if area >= min_area and area < max_area:
fp[k, i] = 1
return tp, fp
def tpfp_default(det_bboxes,
gt_bboxes,
gt_bboxes_ignore=None,
iou_thr=0.5,
area_ranges=None):
"""Check if detected bboxes are true positive or false positive.
Args:
det_bbox (ndarray): Detected bboxes of this image, of shape (m, 5).
gt_bboxes (ndarray): GT bboxes of this image, of shape (n, 4).
gt_bboxes_ignore (ndarray): Ignored gt bboxes of this image,
of shape (k, 4). Default: None
iou_thr (float): IoU threshold to be considered as matched.
Default: 0.5.
area_ranges (list[tuple] | None): Range of bbox areas to be evaluated,
in the format [(min1, max1), (min2, max2), ...]. Default: None.
Returns:
tuple[np.ndarray]: (tp, fp) whose elements are 0 and 1. The shape of
each array is (num_scales, m).
"""
# an indicator of ignored gts
gt_ignore_inds = np.concatenate(
(np.zeros(gt_bboxes.shape[0], dtype=np.bool),
np.ones(gt_bboxes_ignore.shape[0], dtype=np.bool)))
# stack gt_bboxes and gt_bboxes_ignore for convenience
gt_bboxes = np.vstack((gt_bboxes, gt_bboxes_ignore))
num_dets = det_bboxes.shape[0]
num_gts = gt_bboxes.shape[0]
if area_ranges is None:
area_ranges = [(None, None)]
num_scales = len(area_ranges)
# tp and fp are of shape (num_scales, num_gts), each row is tp or fp of
# a certain scale
tp = np.zeros((num_scales, num_dets), dtype=np.float32)
fp = np.zeros((num_scales, num_dets), dtype=np.float32)
# if there is no gt bboxes in this image, then all det bboxes
# within area range are false positives
if gt_bboxes.shape[0] == 0:
if area_ranges == [(None, None)]:
fp[...] = 1
else:
det_areas = (det_bboxes[:, 2] - det_bboxes[:, 0]) * (
det_bboxes[:, 3] - det_bboxes[:, 1])
for i, (min_area, max_area) in enumerate(area_ranges):
fp[i, (det_areas >= min_area) & (det_areas < max_area)] = 1
return tp, fp
ious = bbox_overlaps(det_bboxes, gt_bboxes)
# for each det, the max iou with all gts
ious_max = ious.max(axis=1)
# for each det, which gt overlaps most with it
ious_argmax = ious.argmax(axis=1)
# sort all dets in descending order by scores
sort_inds = np.argsort(-det_bboxes[:, -1])
for k, (min_area, max_area) in enumerate(area_ranges):
gt_covered = np.zeros(num_gts, dtype=bool)
# if no area range is specified, gt_area_ignore is all False
if min_area is None:
gt_area_ignore = np.zeros_like(gt_ignore_inds, dtype=bool)
else:
gt_areas = (gt_bboxes[:, 2] - gt_bboxes[:, 0]) * (
gt_bboxes[:, 3] - gt_bboxes[:, 1])
gt_area_ignore = (gt_areas < min_area) | (gt_areas >= max_area)
for i in sort_inds:
if ious_max[i] >= iou_thr:
matched_gt = ious_argmax[i]
if not (gt_ignore_inds[matched_gt]
or gt_area_ignore[matched_gt]):
if not gt_covered[matched_gt]:
gt_covered[matched_gt] = True
tp[k, i] = 1
else:
fp[k, i] = 1
# otherwise ignore this detected bbox, tp = 0, fp = 0
elif min_area is None:
fp[k, i] = 1
else:
bbox = det_bboxes[i, :4]
area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
if area >= min_area and area < max_area:
fp[k, i] = 1
return tp, fp
def get_cls_results(det_results, annotations, class_id):
"""Get det results and gt information of a certain class.
Args:
det_results (list[list]): Same as `eval_map()`.
annotations (list[dict]): Same as `eval_map()`.
class_id (int): ID of a specific class.
Returns:
tuple[list[np.ndarray]]: detected bboxes, gt bboxes, ignored gt bboxes
"""
cls_dets = [img_res[class_id] for img_res in det_results]
cls_gts = []
cls_gts_ignore = []
for ann in annotations:
gt_inds = ann['labels'] == class_id
cls_gts.append(ann['bboxes'][gt_inds, :])
if ann.get('labels_ignore', None) is not None:
ignore_inds = ann['labels_ignore'] == class_id
cls_gts_ignore.append(ann['bboxes_ignore'][ignore_inds, :])
else:
cls_gts_ignore.append(np.empty((0, 4), dtype=np.float32))
return cls_dets, cls_gts, cls_gts_ignore
def eval_map(det_results,
annotations,
scale_ranges=None,
iou_thr=0.5,
dataset=None,
logger=None,
tpfp_fn=None,
nproc=4):
"""Evaluate mAP of a dataset.
Args:
det_results (list[list]): [[cls1_det, cls2_det, ...], ...].
The outer list indicates images, and the inner list indicates
per-class detected bboxes.
annotations (list[dict]): Ground truth annotations where each item of
the list indicates an image. Keys of annotations are:
- `bboxes`: numpy array of shape (n, 4)
- `labels`: numpy array of shape (n, )
- `bboxes_ignore` (optional): numpy array of shape (k, 4)
- `labels_ignore` (optional): numpy array of shape (k, )
scale_ranges (list[tuple] | None): Range of scales to be evaluated,
in the format [(min1, max1), (min2, max2), ...]. A range of
(32, 64) means the area range between (32**2, 64**2).
Default: None.
iou_thr (float): IoU threshold to be considered as matched.
Default: 0.5.
dataset (list[str] | str | None): Dataset name or dataset classes,
there are minor differences in metrics for different datsets, e.g.
"voc07", "imagenet_det", etc. Default: None.
logger (logging.Logger | str | None): The way to print the mAP
summary. See `mmcv.utils.print_log()` for details. Default: None.
tpfp_fn (callable | None): The function used to determine true/
false positives. If None, :func:`tpfp_default` is used as default
unless dataset is 'det' or 'vid' (:func:`tpfp_imagenet` in this
case). If it is given as a function, then this function is used
to evaluate tp & fp. Default None.
nproc (int): Processes used for computing TP and FP.
Default: 4.
Returns:
tuple: (mAP, [dict, dict, ...])
"""
assert len(det_results) == len(annotations)
num_imgs = len(det_results)
num_scales = len(scale_ranges) if scale_ranges is not None else 1
num_classes = len(det_results[0]) # positive class num
area_ranges = ([(rg[0]**2, rg[1]**2) for rg in scale_ranges]
if scale_ranges is not None else None)
pool = Pool(nproc)
eval_results = []
for i in range(num_classes):
# get gt and det bboxes of this class
cls_dets, cls_gts, cls_gts_ignore = get_cls_results(
det_results, annotations, i)
# choose proper function according to datasets to compute tp and fp
if tpfp_fn is None:
if dataset in ['det', 'vid']:
tpfp_fn = tpfp_imagenet
else:
tpfp_fn = tpfp_default
if not callable(tpfp_fn):
raise ValueError(
f'tpfp_fn has to be a function or None, but got {tpfp_fn}')
# compute tp and fp for each image with multiple processes
tpfp = pool.starmap(
tpfp_fn,
zip(cls_dets, cls_gts, cls_gts_ignore,
[iou_thr for _ in range(num_imgs)],
[area_ranges for _ in range(num_imgs)]))
tp, fp = tuple(zip(*tpfp))
# calculate gt number of each scale
# ignored gts or gts beyond the specific scale are not counted
num_gts = np.zeros(num_scales, dtype=int)
for j, bbox in enumerate(cls_gts):
if area_ranges is None:
num_gts[0] += bbox.shape[0]
else:
gt_areas = (bbox[:, 2] - bbox[:, 0]) * (
bbox[:, 3] - bbox[:, 1])
for k, (min_area, max_area) in enumerate(area_ranges):
num_gts[k] += np.sum((gt_areas >= min_area)
& (gt_areas < max_area))
# sort all det bboxes by score, also sort tp and fp
cls_dets = np.vstack(cls_dets)
num_dets = cls_dets.shape[0]
sort_inds = np.argsort(-cls_dets[:, -1])
tp = np.hstack(tp)[:, sort_inds]
fp = np.hstack(fp)[:, sort_inds]
# calculate recall and precision with tp and fp
tp = np.cumsum(tp, axis=1)
fp = np.cumsum(fp, axis=1)
eps = np.finfo(np.float32).eps
recalls = tp / np.maximum(num_gts[:, np.newaxis], eps)
precisions = tp / np.maximum((tp + fp), eps)
# calculate AP
if scale_ranges is None:
recalls = recalls[0, :]
precisions = precisions[0, :]
num_gts = num_gts.item()
mode = 'area' if dataset != 'voc07' else '11points'
ap = average_precision(recalls, precisions, mode)
eval_results.append({
'num_gts': num_gts,
'num_dets': num_dets,
'recall': recalls,
'precision': precisions,
'ap': ap
})
pool.close()
if scale_ranges is not None:
# shape (num_classes, num_scales)
all_ap = np.vstack([cls_result['ap'] for cls_result in eval_results])
all_num_gts = np.vstack(
[cls_result['num_gts'] for cls_result in eval_results])
mean_ap = []
for i in range(num_scales):
if np.any(all_num_gts[:, i] > 0):
mean_ap.append(all_ap[all_num_gts[:, i] > 0, i].mean())
else:
mean_ap.append(0.0)
else:
aps = []
for cls_result in eval_results:
if cls_result['num_gts'] > 0:
aps.append(cls_result['ap'])
mean_ap = np.array(aps).mean().item() if aps else 0.0
print_map_summary(
mean_ap, eval_results, dataset, area_ranges, logger=logger)
return mean_ap, eval_results
def print_map_summary(mean_ap,
results,
dataset=None,
scale_ranges=None,
logger=None):
"""Print mAP and results of each class.
A table will be printed to show the gts/dets/recall/AP of each class and
the mAP.
Args:
mean_ap (float): Calculated from `eval_map()`.
results (list[dict]): Calculated from `eval_map()`.
dataset (list[str] | str | None): Dataset name or dataset classes.
scale_ranges (list[tuple] | None): Range of scales to be evaluated.
logger (logging.Logger | str | None): The way to print the mAP
summary. See `mmcv.utils.print_log()` for details. Default: None.
"""
if logger == 'silent':
return
if isinstance(results[0]['ap'], np.ndarray):
num_scales = len(results[0]['ap'])
else:
num_scales = 1
if scale_ranges is not None:
assert len(scale_ranges) == num_scales
num_classes = len(results)
recalls = np.zeros((num_scales, num_classes), dtype=np.float32)
aps = np.zeros((num_scales, num_classes), dtype=np.float32)
num_gts = np.zeros((num_scales, num_classes), dtype=int)
for i, cls_result in enumerate(results):
if cls_result['recall'].size > 0:
recalls[:, i] = np.array(cls_result['recall'], ndmin=2)[:, -1]
aps[:, i] = cls_result['ap']
num_gts[:, i] = cls_result['num_gts']
if dataset is None:
label_names = [str(i) for i in range(num_classes)]
elif mmcv.is_str(dataset):
label_names = get_classes(dataset)
else:
label_names = dataset
if not isinstance(mean_ap, list):
mean_ap = [mean_ap]
header = ['class', 'gts', 'dets', 'recall', 'ap']
for i in range(num_scales):
if scale_ranges is not None:
print_log(f'Scale range {scale_ranges[i]}', logger=logger)
table_data = [header]
for j in range(num_classes):
row_data = [
label_names[j], num_gts[i, j], results[j]['num_dets'],
f'{recalls[i, j]:.3f}', f'{aps[i, j]:.3f}'
]
table_data.append(row_data)
table_data.append(['mAP', '', '', '', f'{mean_ap[i]:.3f}'])
table = AsciiTable(table_data)
table.inner_footing_row_border = True
print_log('\n' + table.table, logger=logger)
from collections.abc import Sequence
import numpy as np
from mmcv.utils import print_log
from terminaltables import AsciiTable
from .bbox_overlaps import bbox_overlaps
def _recalls(all_ious, proposal_nums, thrs):
img_num = all_ious.shape[0]
total_gt_num = sum([ious.shape[0] for ious in all_ious])
_ious = np.zeros((proposal_nums.size, total_gt_num), dtype=np.float32)
for k, proposal_num in enumerate(proposal_nums):
tmp_ious = np.zeros(0)
for i in range(img_num):
ious = all_ious[i][:, :proposal_num].copy()
gt_ious = np.zeros((ious.shape[0]))
if ious.size == 0:
tmp_ious = np.hstack((tmp_ious, gt_ious))
continue
for j in range(ious.shape[0]):
gt_max_overlaps = ious.argmax(axis=1)
max_ious = ious[np.arange(0, ious.shape[0]), gt_max_overlaps]
gt_idx = max_ious.argmax()
gt_ious[j] = max_ious[gt_idx]
box_idx = gt_max_overlaps[gt_idx]
ious[gt_idx, :] = -1
ious[:, box_idx] = -1
tmp_ious = np.hstack((tmp_ious, gt_ious))
_ious[k, :] = tmp_ious
_ious = np.fliplr(np.sort(_ious, axis=1))
recalls = np.zeros((proposal_nums.size, thrs.size))
for i, thr in enumerate(thrs):
recalls[:, i] = (_ious >= thr).sum(axis=1) / float(total_gt_num)
return recalls
def set_recall_param(proposal_nums, iou_thrs):
"""Check proposal_nums and iou_thrs and set correct format."""
if isinstance(proposal_nums, Sequence):
_proposal_nums = np.array(proposal_nums)
elif isinstance(proposal_nums, int):
_proposal_nums = np.array([proposal_nums])
else:
_proposal_nums = proposal_nums
if iou_thrs is None:
_iou_thrs = np.array([0.5])
elif isinstance(iou_thrs, Sequence):
_iou_thrs = np.array(iou_thrs)
elif isinstance(iou_thrs, float):
_iou_thrs = np.array([iou_thrs])
else:
_iou_thrs = iou_thrs
return _proposal_nums, _iou_thrs
def eval_recalls(gts,
proposals,
proposal_nums=None,
iou_thrs=0.5,
logger=None):
"""Calculate recalls.
Args:
gts (list[ndarray]): a list of arrays of shape (n, 4)
proposals (list[ndarray]): a list of arrays of shape (k, 4) or (k, 5)
proposal_nums (int | Sequence[int]): Top N proposals to be evaluated.
iou_thrs (float | Sequence[float]): IoU thresholds. Default: 0.5.
logger (logging.Logger | str | None): The way to print the recall
summary. See `mmcv.utils.print_log()` for details. Default: None.
Returns:
ndarray: recalls of different ious and proposal nums
"""
img_num = len(gts)
assert img_num == len(proposals)
proposal_nums, iou_thrs = set_recall_param(proposal_nums, iou_thrs)
all_ious = []
for i in range(img_num):
if proposals[i].ndim == 2 and proposals[i].shape[1] == 5:
scores = proposals[i][:, 4]
sort_idx = np.argsort(scores)[::-1]
img_proposal = proposals[i][sort_idx, :]
else:
img_proposal = proposals[i]
prop_num = min(img_proposal.shape[0], proposal_nums[-1])
if gts[i] is None or gts[i].shape[0] == 0:
ious = np.zeros((0, img_proposal.shape[0]), dtype=np.float32)
else:
ious = bbox_overlaps(gts[i], img_proposal[:prop_num, :4])
all_ious.append(ious)
all_ious = np.array(all_ious)
recalls = _recalls(all_ious, proposal_nums, iou_thrs)
print_recall_summary(recalls, proposal_nums, iou_thrs, logger=logger)
return recalls
def print_recall_summary(recalls,
proposal_nums,
iou_thrs,
row_idxs=None,
col_idxs=None,
logger=None):
"""Print recalls in a table.
Args:
recalls (ndarray): calculated from `bbox_recalls`
proposal_nums (ndarray or list): top N proposals
iou_thrs (ndarray or list): iou thresholds
row_idxs (ndarray): which rows(proposal nums) to print
col_idxs (ndarray): which cols(iou thresholds) to print
logger (logging.Logger | str | None): The way to print the recall
summary. See `mmcv.utils.print_log()` for details. Default: None.
"""
proposal_nums = np.array(proposal_nums, dtype=np.int32)
iou_thrs = np.array(iou_thrs)
if row_idxs is None:
row_idxs = np.arange(proposal_nums.size)
if col_idxs is None:
col_idxs = np.arange(iou_thrs.size)
row_header = [''] + iou_thrs[col_idxs].tolist()
table_data = [row_header]
for i, num in enumerate(proposal_nums[row_idxs]):
row = [f'{val:.3f}' for val in recalls[row_idxs[i], col_idxs].tolist()]
row.insert(0, num)
table_data.append(row)
table = AsciiTable(table_data)
print_log('\n' + table.table, logger=logger)
def plot_num_recall(recalls, proposal_nums):
"""Plot Proposal_num-Recalls curve.
Args:
recalls(ndarray or list): shape (k,)
proposal_nums(ndarray or list): same shape as `recalls`
"""
if isinstance(proposal_nums, np.ndarray):
_proposal_nums = proposal_nums.tolist()
else:
_proposal_nums = proposal_nums
if isinstance(recalls, np.ndarray):
_recalls = recalls.tolist()
else:
_recalls = recalls
import matplotlib.pyplot as plt
f = plt.figure()
plt.plot([0] + _proposal_nums, [0] + _recalls)
plt.xlabel('Proposal num')
plt.ylabel('Recall')
plt.axis([0, proposal_nums.max(), 0, 1])
f.show()
def plot_iou_recall(recalls, iou_thrs):
"""Plot IoU-Recalls curve.
Args:
recalls(ndarray or list): shape (k,)
iou_thrs(ndarray or list): same shape as `recalls`
"""
if isinstance(iou_thrs, np.ndarray):
_iou_thrs = iou_thrs.tolist()
else:
_iou_thrs = iou_thrs
if isinstance(recalls, np.ndarray):
_recalls = recalls.tolist()
else:
_recalls = recalls
import matplotlib.pyplot as plt
f = plt.figure()
plt.plot(_iou_thrs + [1.0], _recalls + [0.])
plt.xlabel('IoU')
plt.ylabel('Recall')
plt.axis([iou_thrs.min(), 1, 0, 1])
f.show()
from .pytorch2onnx import (build_model_from_cfg,
generate_inputs_and_wrap_model,
preprocess_example_input)
__all__ = [
'build_model_from_cfg', 'generate_inputs_and_wrap_model',
'preprocess_example_input'
]
from functools import partial
import mmcv
import numpy as np
import torch
from mmcv.runner import load_checkpoint
def generate_inputs_and_wrap_model(config_path, checkpoint_path, input_config):
"""Prepare sample input and wrap model for ONNX export.
The ONNX export API only accept args, and all inputs should be
torch.Tensor or corresponding types (such as tuple of tensor).
So we should call this function before exporting. This function will:
1. generate corresponding inputs which are used to execute the model.
2. Wrap the model's forward function.
For example, the MMDet models' forward function has a parameter
``return_loss:bool``. As we want to set it as False while export API
supports neither bool type or kwargs. So we have to replace the forward
like: ``model.forward = partial(model.forward, return_loss=False)``
Args:
config_path (str): the OpenMMLab config for the model we want to
export to ONNX
checkpoint_path (str): Path to the corresponding checkpoint
input_config (dict): the exactly data in this dict depends on the
framework. For MMSeg, we can just declare the input shape,
and generate the dummy data accordingly. However, for MMDet,
we may pass the real img path, or the NMS will return None
as there is no legal bbox.
Returns:
tuple: (model, tensor_data) wrapped model which can be called by \
model(*tensor_data) and a list of inputs which are used to execute \
the model while exporting.
"""
model = build_model_from_cfg(config_path, checkpoint_path)
one_img, one_meta = preprocess_example_input(input_config)
tensor_data = [one_img]
model.forward = partial(
model.forward, img_metas=[[one_meta]], return_loss=False)
# pytorch has some bug in pytorch1.3, we have to fix it
# by replacing these existing op
opset_version = 11
# put the import within the function thus it will not cause import error
# when not using this function
try:
from mmcv.onnx.symbolic import register_extra_symbolics
except ModuleNotFoundError:
raise NotImplementedError('please update mmcv to version>=v1.0.4')
register_extra_symbolics(opset_version)
return model, tensor_data
def build_model_from_cfg(config_path, checkpoint_path):
"""Build a model from config and load the given checkpoint.
Args:
config_path (str): the OpenMMLab config for the model we want to
export to ONNX
checkpoint_path (str): Path to the corresponding checkpoint
Returns:
torch.nn.Module: the built model
"""
from mmdet.models import build_detector
cfg = mmcv.Config.fromfile(config_path)
# import modules from string list.
if cfg.get('custom_imports', None):
from mmcv.utils import import_modules_from_strings
import_modules_from_strings(**cfg['custom_imports'])
cfg.model.pretrained = None
cfg.data.test.test_mode = True
# build the model
cfg.model.train_cfg = None
model = build_detector(cfg.model, test_cfg=cfg.get('test_cfg'))
load_checkpoint(model, checkpoint_path, map_location='cpu')
model.cpu().eval()
return model
def preprocess_example_input(input_config):
"""Prepare an example input image for ``generate_inputs_and_wrap_model``.
Args:
input_config (dict): customized config describing the example input.
Returns:
tuple: (one_img, one_meta), tensor of the example input image and \
meta information for the example input image.
Examples:
>>> from mmdet.core.export import preprocess_example_input
>>> input_config = {
>>> 'input_shape': (1,3,224,224),
>>> 'input_path': 'demo/demo.jpg',
>>> 'normalize_cfg': {
>>> 'mean': (123.675, 116.28, 103.53),
>>> 'std': (58.395, 57.12, 57.375)
>>> }
>>> }
>>> one_img, one_meta = preprocess_example_input(input_config)
>>> print(one_img.shape)
torch.Size([1, 3, 224, 224])
>>> print(one_meta)
{'img_shape': (224, 224, 3),
'ori_shape': (224, 224, 3),
'pad_shape': (224, 224, 3),
'filename': '<demo>.png',
'scale_factor': 1.0,
'flip': False}
"""
input_path = input_config['input_path']
input_shape = input_config['input_shape']
one_img = mmcv.imread(input_path)
one_img = mmcv.imresize(one_img, input_shape[2:][::-1])
show_img = one_img.copy()
if 'normalize_cfg' in input_config.keys():
normalize_cfg = input_config['normalize_cfg']
mean = np.array(normalize_cfg['mean'], dtype=np.float32)
std = np.array(normalize_cfg['std'], dtype=np.float32)
one_img = mmcv.imnormalize(one_img, mean, std)
one_img = one_img.transpose(2, 0, 1)
one_img = torch.from_numpy(one_img).unsqueeze(0).float().requires_grad_(
True)
(_, C, H, W) = input_shape
one_meta = {
'img_shape': (H, W, C),
'ori_shape': (H, W, C),
'pad_shape': (H, W, C),
'filename': '<demo>.png',
'scale_factor': 1.0,
'flip': False,
'show_img': show_img,
}
return one_img, one_meta
from .deprecated_fp16_utils import \
DeprecatedFp16OptimizerHook as Fp16OptimizerHook
from .deprecated_fp16_utils import deprecated_auto_fp16 as auto_fp16
from .deprecated_fp16_utils import deprecated_force_fp32 as force_fp32
from .deprecated_fp16_utils import \
deprecated_wrap_fp16_model as wrap_fp16_model
__all__ = ['auto_fp16', 'force_fp32', 'Fp16OptimizerHook', 'wrap_fp16_model']
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment