"vscode:/vscode.git/clone" did not exist on "d8932c55e72f919cd92d0d019fb7fd0e01d857cd"
Commit b634945d authored by limm's avatar limm
Browse files

support v0.6

parent 5b3792fc
# Copyright (c) Facebook, Inc. and its affiliates.
"""
Wrappers around on some nn functions, mainly to support empty tensors.
Ideally, add support directly in PyTorch to empty tensors in those functions.
These can be removed once https://github.com/pytorch/pytorch/issues/12013
is implemented
"""
from typing import List, Optional
import torch
from torch.nn import functional as F
def shapes_to_tensor(x: List[int], device: Optional[torch.device] = None) -> torch.Tensor:
"""
Turn a list of integer scalars or integer Tensor scalars into a vector,
in a way that's both traceable and scriptable.
In tracing, `x` should be a list of scalar Tensor, so the output can trace to the inputs.
In scripting or eager, `x` should be a list of int.
"""
if torch.jit.is_scripting():
return torch.as_tensor(x, device=device)
if torch.jit.is_tracing():
assert all(
[isinstance(t, torch.Tensor) for t in x]
), "Shape should be tensor during tracing!"
# as_tensor should not be used in tracing because it records a constant
ret = torch.stack(x)
if ret.device != device: # avoid recording a hard-coded device if not necessary
ret = ret.to(device=device)
return ret
return torch.as_tensor(x, device=device)
def cat(tensors: List[torch.Tensor], dim: int = 0):
"""
Efficient version of torch.cat that avoids a copy if there is only a single element in a list
"""
assert isinstance(tensors, (list, tuple))
if len(tensors) == 1:
return tensors[0]
return torch.cat(tensors, dim)
def cross_entropy(input, target, *, reduction="mean", **kwargs):
"""
Same as `torch.nn.functional.cross_entropy`, but returns 0 (instead of nan)
for empty inputs.
"""
if target.numel() == 0 and reduction == "mean":
return input.sum() * 0.0 # connect the gradient
return F.cross_entropy(input, target, reduction=reduction, **kwargs)
class _NewEmptyTensorOp(torch.autograd.Function):
@staticmethod
def forward(ctx, x, new_shape):
ctx.shape = x.shape
return x.new_empty(new_shape)
@staticmethod
def backward(ctx, grad):
shape = ctx.shape
return _NewEmptyTensorOp.apply(grad, shape), None
class Conv2d(torch.nn.Conv2d):
"""
A wrapper around :class:`torch.nn.Conv2d` to support empty inputs and more features.
"""
def __init__(self, *args, **kwargs):
"""
Extra keyword arguments supported in addition to those in `torch.nn.Conv2d`:
Args:
norm (nn.Module, optional): a normalization layer
activation (callable(Tensor) -> Tensor): a callable activation function
It assumes that norm layer is used before activation.
"""
norm = kwargs.pop("norm", None)
activation = kwargs.pop("activation", None)
super().__init__(*args, **kwargs)
self.norm = norm
self.activation = activation
def forward(self, x):
# torchscript does not support SyncBatchNorm yet
# https://github.com/pytorch/pytorch/issues/40507
# and we skip these codes in torchscript since:
# 1. currently we only support torchscript in evaluation mode
# 2. features needed by exporting module to torchscript are added in PyTorch 1.6 or
# later version, `Conv2d` in these PyTorch versions has already supported empty inputs.
if not torch.jit.is_scripting():
if x.numel() == 0 and self.training:
# https://github.com/pytorch/pytorch/issues/12013
assert not isinstance(
self.norm, torch.nn.SyncBatchNorm
), "SyncBatchNorm does not support empty inputs!"
x = F.conv2d(
x, self.weight, self.bias, self.stride, self.padding, self.dilation, self.groups
)
if self.norm is not None:
x = self.norm(x)
if self.activation is not None:
x = self.activation(x)
return x
ConvTranspose2d = torch.nn.ConvTranspose2d
BatchNorm2d = torch.nn.BatchNorm2d
interpolate = F.interpolate
Linear = torch.nn.Linear
def nonzero_tuple(x):
"""
A 'as_tuple=True' version of torch.nonzero to support torchscript.
because of https://github.com/pytorch/pytorch/issues/38718
"""
if torch.jit.is_scripting():
if x.dim() == 0:
return x.unsqueeze(0).nonzero().unbind(1)
return x.nonzero().unbind(1)
else:
return x.nonzero(as_tuple=True)
# Copyright (c) Facebook, Inc. and its affiliates.
"""
Model Zoo API for Detectron2: a collection of functions to create common model architectures
listed in `MODEL_ZOO.md <https://github.com/facebookresearch/detectron2/blob/master/MODEL_ZOO.md>`_,
and optionally load their pre-trained weights.
"""
from .model_zoo import get, get_config_file, get_checkpoint_url, get_config
__all__ = ["get_checkpoint_url", "get", "get_config_file", "get_config"]
# Copyright (c) Facebook, Inc. and its affiliates.
import os
from typing import Optional
import pkg_resources
import torch
from detectron2.checkpoint import DetectionCheckpointer
from detectron2.config import CfgNode, LazyConfig, get_cfg, instantiate
from detectron2.modeling import build_model
class _ModelZooUrls(object):
"""
Mapping from names to officially released Detectron2 pre-trained models.
"""
S3_PREFIX = "https://dl.fbaipublicfiles.com/detectron2/"
# format: {config_path.yaml} -> model_id/model_final_{commit}.pkl
CONFIG_PATH_TO_URL_SUFFIX = {
# COCO Detection with Faster R-CNN
"COCO-Detection/faster_rcnn_R_50_C4_1x": "137257644/model_final_721ade.pkl",
"COCO-Detection/faster_rcnn_R_50_DC5_1x": "137847829/model_final_51d356.pkl",
"COCO-Detection/faster_rcnn_R_50_FPN_1x": "137257794/model_final_b275ba.pkl",
"COCO-Detection/faster_rcnn_R_50_C4_3x": "137849393/model_final_f97cb7.pkl",
"COCO-Detection/faster_rcnn_R_50_DC5_3x": "137849425/model_final_68d202.pkl",
"COCO-Detection/faster_rcnn_R_50_FPN_3x": "137849458/model_final_280758.pkl",
"COCO-Detection/faster_rcnn_R_101_C4_3x": "138204752/model_final_298dad.pkl",
"COCO-Detection/faster_rcnn_R_101_DC5_3x": "138204841/model_final_3e0943.pkl",
"COCO-Detection/faster_rcnn_R_101_FPN_3x": "137851257/model_final_f6e8b1.pkl",
"COCO-Detection/faster_rcnn_X_101_32x8d_FPN_3x": "139173657/model_final_68b088.pkl",
# COCO Detection with RetinaNet
"COCO-Detection/retinanet_R_50_FPN_1x": "190397773/model_final_bfca0b.pkl",
"COCO-Detection/retinanet_R_50_FPN_3x": "190397829/model_final_5bd44e.pkl",
"COCO-Detection/retinanet_R_101_FPN_3x": "190397697/model_final_971ab9.pkl",
# COCO Detection with RPN and Fast R-CNN
"COCO-Detection/rpn_R_50_C4_1x": "137258005/model_final_450694.pkl",
"COCO-Detection/rpn_R_50_FPN_1x": "137258492/model_final_02ce48.pkl",
"COCO-Detection/fast_rcnn_R_50_FPN_1x": "137635226/model_final_e5f7ce.pkl",
# COCO Instance Segmentation Baselines with Mask R-CNN
"COCO-InstanceSegmentation/mask_rcnn_R_50_C4_1x": "137259246/model_final_9243eb.pkl",
"COCO-InstanceSegmentation/mask_rcnn_R_50_DC5_1x": "137260150/model_final_4f86c3.pkl",
"COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_1x": "137260431/model_final_a54504.pkl",
"COCO-InstanceSegmentation/mask_rcnn_R_50_C4_3x": "137849525/model_final_4ce675.pkl",
"COCO-InstanceSegmentation/mask_rcnn_R_50_DC5_3x": "137849551/model_final_84107b.pkl",
"COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x": "137849600/model_final_f10217.pkl",
"COCO-InstanceSegmentation/mask_rcnn_R_101_C4_3x": "138363239/model_final_a2914c.pkl",
"COCO-InstanceSegmentation/mask_rcnn_R_101_DC5_3x": "138363294/model_final_0464b7.pkl",
"COCO-InstanceSegmentation/mask_rcnn_R_101_FPN_3x": "138205316/model_final_a3ec72.pkl",
"COCO-InstanceSegmentation/mask_rcnn_X_101_32x8d_FPN_3x": "139653917/model_final_2d9806.pkl", # noqa
# New baselines using Large-Scale Jitter and Longer Training Schedule
"new_baselines/mask_rcnn_R_50_FPN_100ep_LSJ": "42047764/model_final_bb69de.pkl",
"new_baselines/mask_rcnn_R_50_FPN_200ep_LSJ": "42047638/model_final_89a8d3.pkl",
"new_baselines/mask_rcnn_R_50_FPN_400ep_LSJ": "42019571/model_final_14d201.pkl",
"new_baselines/mask_rcnn_R_101_FPN_100ep_LSJ": "42025812/model_final_4f7b58.pkl",
"new_baselines/mask_rcnn_R_101_FPN_200ep_LSJ": "42131867/model_final_0bb7ae.pkl",
"new_baselines/mask_rcnn_R_101_FPN_400ep_LSJ": "42073830/model_final_f96b26.pkl",
"new_baselines/mask_rcnn_regnetx_4gf_dds_FPN_100ep_LSJ": "42047771/model_final_b7fbab.pkl", # noqa
"new_baselines/mask_rcnn_regnetx_4gf_dds_FPN_200ep_LSJ": "42132721/model_final_5d87c1.pkl", # noqa
"new_baselines/mask_rcnn_regnetx_4gf_dds_FPN_400ep_LSJ": "42025447/model_final_f1362d.pkl", # noqa
"new_baselines/mask_rcnn_regnety_4gf_dds_FPN_100ep_LSJ": "42047784/model_final_6ba57e.pkl", # noqa
"new_baselines/mask_rcnn_regnety_4gf_dds_FPN_200ep_LSJ": "42047642/model_final_27b9c1.pkl", # noqa
"new_baselines/mask_rcnn_regnety_4gf_dds_FPN_400ep_LSJ": "42045954/model_final_ef3a80.pkl", # noqa
# COCO Person Keypoint Detection Baselines with Keypoint R-CNN
"COCO-Keypoints/keypoint_rcnn_R_50_FPN_1x": "137261548/model_final_04e291.pkl",
"COCO-Keypoints/keypoint_rcnn_R_50_FPN_3x": "137849621/model_final_a6e10b.pkl",
"COCO-Keypoints/keypoint_rcnn_R_101_FPN_3x": "138363331/model_final_997cc7.pkl",
"COCO-Keypoints/keypoint_rcnn_X_101_32x8d_FPN_3x": "139686956/model_final_5ad38f.pkl",
# COCO Panoptic Segmentation Baselines with Panoptic FPN
"COCO-PanopticSegmentation/panoptic_fpn_R_50_1x": "139514544/model_final_dbfeb4.pkl",
"COCO-PanopticSegmentation/panoptic_fpn_R_50_3x": "139514569/model_final_c10459.pkl",
"COCO-PanopticSegmentation/panoptic_fpn_R_101_3x": "139514519/model_final_cafdb1.pkl",
# LVIS Instance Segmentation Baselines with Mask R-CNN
"LVISv0.5-InstanceSegmentation/mask_rcnn_R_50_FPN_1x": "144219072/model_final_571f7c.pkl", # noqa
"LVISv0.5-InstanceSegmentation/mask_rcnn_R_101_FPN_1x": "144219035/model_final_824ab5.pkl", # noqa
"LVISv0.5-InstanceSegmentation/mask_rcnn_X_101_32x8d_FPN_1x": "144219108/model_final_5e3439.pkl", # noqa
# Cityscapes & Pascal VOC Baselines
"Cityscapes/mask_rcnn_R_50_FPN": "142423278/model_final_af9cf5.pkl",
"PascalVOC-Detection/faster_rcnn_R_50_C4": "142202221/model_final_b1acc2.pkl",
# Other Settings
"Misc/mask_rcnn_R_50_FPN_1x_dconv_c3-c5": "138602867/model_final_65c703.pkl",
"Misc/mask_rcnn_R_50_FPN_3x_dconv_c3-c5": "144998336/model_final_821d0b.pkl",
"Misc/cascade_mask_rcnn_R_50_FPN_1x": "138602847/model_final_e9d89b.pkl",
"Misc/cascade_mask_rcnn_R_50_FPN_3x": "144998488/model_final_480dd8.pkl",
"Misc/mask_rcnn_R_50_FPN_3x_syncbn": "169527823/model_final_3b3c51.pkl",
"Misc/mask_rcnn_R_50_FPN_3x_gn": "138602888/model_final_dc5d9e.pkl",
"Misc/scratch_mask_rcnn_R_50_FPN_3x_gn": "138602908/model_final_01ca85.pkl",
"Misc/scratch_mask_rcnn_R_50_FPN_9x_gn": "183808979/model_final_da7b4c.pkl",
"Misc/scratch_mask_rcnn_R_50_FPN_9x_syncbn": "184226666/model_final_5ce33e.pkl",
"Misc/panoptic_fpn_R_101_dconv_cascade_gn_3x": "139797668/model_final_be35db.pkl",
"Misc/cascade_mask_rcnn_X_152_32x8d_FPN_IN5k_gn_dconv": "18131413/model_0039999_e76410.pkl", # noqa
# D1 Comparisons
"Detectron1-Comparisons/faster_rcnn_R_50_FPN_noaug_1x": "137781054/model_final_7ab50c.pkl", # noqa
"Detectron1-Comparisons/mask_rcnn_R_50_FPN_noaug_1x": "137781281/model_final_62ca52.pkl", # noqa
"Detectron1-Comparisons/keypoint_rcnn_R_50_FPN_1x": "137781195/model_final_cce136.pkl",
}
@staticmethod
def query(config_path: str) -> Optional[str]:
"""
Args:
config_path: relative config filename
"""
name = config_path.replace(".yaml", "").replace(".py", "")
if name in _ModelZooUrls.CONFIG_PATH_TO_URL_SUFFIX:
suffix = _ModelZooUrls.CONFIG_PATH_TO_URL_SUFFIX[name]
return _ModelZooUrls.S3_PREFIX + name + "/" + suffix
return None
def get_checkpoint_url(config_path):
"""
Returns the URL to the model trained using the given config
Args:
config_path (str): config file name relative to detectron2's "configs/"
directory, e.g., "COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_1x.yaml"
Returns:
str: a URL to the model
"""
url = _ModelZooUrls.query(config_path)
if url is None:
raise RuntimeError("Pretrained model for {} is not available!".format(config_path))
return url
def get_config_file(config_path):
"""
Returns path to a builtin config file.
Args:
config_path (str): config file name relative to detectron2's "configs/"
directory, e.g., "COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_1x.yaml"
Returns:
str: the real path to the config file.
"""
cfg_file = pkg_resources.resource_filename(
"detectron2.model_zoo", os.path.join("configs", config_path)
)
if not os.path.exists(cfg_file):
raise RuntimeError("{} not available in Model Zoo!".format(config_path))
return cfg_file
def get_config(config_path, trained: bool = False):
"""
Returns a config object for a model in model zoo.
Args:
config_path (str): config file name relative to detectron2's "configs/"
directory, e.g., "COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_1x.yaml"
trained (bool): If True, will set ``MODEL.WEIGHTS`` to trained model zoo weights.
If False, the checkpoint specified in the config file's ``MODEL.WEIGHTS`` is used
instead; this will typically (though not always) initialize a subset of weights using
an ImageNet pre-trained model, while randomly initializing the other weights.
Returns:
CfgNode or omegaconf.DictConfig: a config object
"""
cfg_file = get_config_file(config_path)
if cfg_file.endswith(".yaml"):
cfg = get_cfg()
cfg.merge_from_file(cfg_file)
if trained:
cfg.MODEL.WEIGHTS = get_checkpoint_url(config_path)
return cfg
elif cfg_file.endswith(".py"):
cfg = LazyConfig.load(cfg_file)
if trained:
url = get_checkpoint_url(config_path)
if "train" in cfg and "init_checkpoint" in cfg.train:
cfg.train.init_checkpoint = url
else:
raise NotImplementedError
return cfg
def get(config_path, trained: bool = False, device: Optional[str] = None):
"""
Get a model specified by relative path under Detectron2's official ``configs/`` directory.
Args:
config_path (str): config file name relative to detectron2's "configs/"
directory, e.g., "COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_1x.yaml"
trained (bool): see :func:`get_config`.
device (str or None): overwrite the device in config, if given.
Returns:
nn.Module: a detectron2 model. Will be in training mode.
Example:
::
from detectron2 import model_zoo
model = model_zoo.get("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_1x.yaml", trained=True)
"""
cfg = get_config(config_path, trained)
if device is None and not torch.cuda.is_available():
device = "cpu"
if device is not None and isinstance(cfg, CfgNode):
cfg.MODEL.DEVICE = device
if isinstance(cfg, CfgNode):
model = build_model(cfg)
DetectionCheckpointer(model).load(cfg.MODEL.WEIGHTS)
else:
model = instantiate(cfg.model)
if device is not None:
model = model.to(device)
if "train" in cfg and "init_checkpoint" in cfg.train:
DetectionCheckpointer(model).load(cfg.train.init_checkpoint)
return model
# Copyright (c) Facebook, Inc. and its affiliates.
from detectron2.layers import ShapeSpec
from .anchor_generator import build_anchor_generator, ANCHOR_GENERATOR_REGISTRY
from .backbone import (
BACKBONE_REGISTRY,
FPN,
Backbone,
ResNet,
ResNetBlockBase,
build_backbone,
build_resnet_backbone,
make_stage,
)
from .meta_arch import (
META_ARCH_REGISTRY,
SEM_SEG_HEADS_REGISTRY,
GeneralizedRCNN,
PanopticFPN,
ProposalNetwork,
RetinaNet,
SemanticSegmentor,
build_model,
build_sem_seg_head,
)
from .postprocessing import detector_postprocess
from .proposal_generator import (
PROPOSAL_GENERATOR_REGISTRY,
build_proposal_generator,
RPN_HEAD_REGISTRY,
build_rpn_head,
)
from .roi_heads import (
ROI_BOX_HEAD_REGISTRY,
ROI_HEADS_REGISTRY,
ROI_KEYPOINT_HEAD_REGISTRY,
ROI_MASK_HEAD_REGISTRY,
ROIHeads,
StandardROIHeads,
BaseMaskRCNNHead,
BaseKeypointRCNNHead,
FastRCNNOutputLayers,
build_box_head,
build_keypoint_head,
build_mask_head,
build_roi_heads,
)
from .test_time_augmentation import DatasetMapperTTA, GeneralizedRCNNWithTTA
from .mmdet_wrapper import MMDetBackbone, MMDetDetector
_EXCLUDE = {"ShapeSpec"}
__all__ = [k for k in globals().keys() if k not in _EXCLUDE and not k.startswith("_")]
from detectron2.utils.env import fixup_module_metadata
fixup_module_metadata(__name__, globals(), __all__)
del fixup_module_metadata
# Copyright (c) Facebook, Inc. and its affiliates.
import collections
import math
from typing import List
import torch
from torch import nn
from detectron2.config import configurable
from detectron2.layers import ShapeSpec
from detectron2.structures import Boxes, RotatedBoxes
from detectron2.utils.registry import Registry
ANCHOR_GENERATOR_REGISTRY = Registry("ANCHOR_GENERATOR")
ANCHOR_GENERATOR_REGISTRY.__doc__ = """
Registry for modules that creates object detection anchors for feature maps.
The registered object will be called with `obj(cfg, input_shape)`.
"""
class BufferList(nn.Module):
"""
Similar to nn.ParameterList, but for buffers
"""
def __init__(self, buffers):
super().__init__()
for i, buffer in enumerate(buffers):
# Use non-persistent buffer so the values are not saved in checkpoint
self.register_buffer(str(i), buffer, persistent=False)
def __len__(self):
return len(self._buffers)
def __iter__(self):
return iter(self._buffers.values())
def _create_grid_offsets(size: List[int], stride: int, offset: float, device: torch.device):
grid_height, grid_width = size
shifts_x = torch.arange(
offset * stride, grid_width * stride, step=stride, dtype=torch.float32, device=device
)
shifts_y = torch.arange(
offset * stride, grid_height * stride, step=stride, dtype=torch.float32, device=device
)
shift_y, shift_x = torch.meshgrid(shifts_y, shifts_x)
shift_x = shift_x.reshape(-1)
shift_y = shift_y.reshape(-1)
return shift_x, shift_y
def _broadcast_params(params, num_features, name):
"""
If one size (or aspect ratio) is specified and there are multiple feature
maps, we "broadcast" anchors of that single size (or aspect ratio)
over all feature maps.
If params is list[float], or list[list[float]] with len(params) == 1, repeat
it num_features time.
Returns:
list[list[float]]: param for each feature
"""
assert isinstance(
params, collections.abc.Sequence
), f"{name} in anchor generator has to be a list! Got {params}."
assert len(params), f"{name} in anchor generator cannot be empty!"
if not isinstance(params[0], collections.abc.Sequence): # params is list[float]
return [params] * num_features
if len(params) == 1:
return list(params) * num_features
assert len(params) == num_features, (
f"Got {name} of length {len(params)} in anchor generator, "
f"but the number of input features is {num_features}!"
)
return params
@ANCHOR_GENERATOR_REGISTRY.register()
class DefaultAnchorGenerator(nn.Module):
"""
Compute anchors in the standard ways described in
"Faster R-CNN: Towards Real-Time Object Detection with Region Proposal Networks".
"""
box_dim: torch.jit.Final[int] = 4
"""
the dimension of each anchor box.
"""
@configurable
def __init__(self, *, sizes, aspect_ratios, strides, offset=0.5):
"""
This interface is experimental.
Args:
sizes (list[list[float]] or list[float]):
If ``sizes`` is list[list[float]], ``sizes[i]`` is the list of anchor sizes
(i.e. sqrt of anchor area) to use for the i-th feature map.
If ``sizes`` is list[float], ``sizes`` is used for all feature maps.
Anchor sizes are given in absolute lengths in units of
the input image; they do not dynamically scale if the input image size changes.
aspect_ratios (list[list[float]] or list[float]): list of aspect ratios
(i.e. height / width) to use for anchors. Same "broadcast" rule for `sizes` applies.
strides (list[int]): stride of each input feature.
offset (float): Relative offset between the center of the first anchor and the top-left
corner of the image. Value has to be in [0, 1).
Recommend to use 0.5, which means half stride.
"""
super().__init__()
self.strides = strides
self.num_features = len(self.strides)
sizes = _broadcast_params(sizes, self.num_features, "sizes")
aspect_ratios = _broadcast_params(aspect_ratios, self.num_features, "aspect_ratios")
self.cell_anchors = self._calculate_anchors(sizes, aspect_ratios)
self.offset = offset
assert 0.0 <= self.offset < 1.0, self.offset
@classmethod
def from_config(cls, cfg, input_shape: List[ShapeSpec]):
return {
"sizes": cfg.MODEL.ANCHOR_GENERATOR.SIZES,
"aspect_ratios": cfg.MODEL.ANCHOR_GENERATOR.ASPECT_RATIOS,
"strides": [x.stride for x in input_shape],
"offset": cfg.MODEL.ANCHOR_GENERATOR.OFFSET,
}
def _calculate_anchors(self, sizes, aspect_ratios):
cell_anchors = [
self.generate_cell_anchors(s, a).float() for s, a in zip(sizes, aspect_ratios)
]
return BufferList(cell_anchors)
@property
@torch.jit.unused
def num_cell_anchors(self):
"""
Alias of `num_anchors`.
"""
return self.num_anchors
@property
@torch.jit.unused
def num_anchors(self):
"""
Returns:
list[int]: Each int is the number of anchors at every pixel
location, on that feature map.
For example, if at every pixel we use anchors of 3 aspect
ratios and 5 sizes, the number of anchors is 15.
(See also ANCHOR_GENERATOR.SIZES and ANCHOR_GENERATOR.ASPECT_RATIOS in config)
In standard RPN models, `num_anchors` on every feature map is the same.
"""
return [len(cell_anchors) for cell_anchors in self.cell_anchors]
def _grid_anchors(self, grid_sizes: List[List[int]]):
"""
Returns:
list[Tensor]: #featuremap tensors, each is (#locations x #cell_anchors) x 4
"""
anchors = []
# buffers() not supported by torchscript. use named_buffers() instead
buffers: List[torch.Tensor] = [x[1] for x in self.cell_anchors.named_buffers()]
for size, stride, base_anchors in zip(grid_sizes, self.strides, buffers):
shift_x, shift_y = _create_grid_offsets(size, stride, self.offset, base_anchors.device)
shifts = torch.stack((shift_x, shift_y, shift_x, shift_y), dim=1)
anchors.append((shifts.view(-1, 1, 4) + base_anchors.view(1, -1, 4)).reshape(-1, 4))
return anchors
def generate_cell_anchors(self, sizes=(32, 64, 128, 256, 512), aspect_ratios=(0.5, 1, 2)):
"""
Generate a tensor storing canonical anchor boxes, which are all anchor
boxes of different sizes and aspect_ratios centered at (0, 0).
We can later build the set of anchors for a full feature map by
shifting and tiling these tensors (see `meth:_grid_anchors`).
Args:
sizes (tuple[float]):
aspect_ratios (tuple[float]]):
Returns:
Tensor of shape (len(sizes) * len(aspect_ratios), 4) storing anchor boxes
in XYXY format.
"""
# This is different from the anchor generator defined in the original Faster R-CNN
# code or Detectron. They yield the same AP, however the old version defines cell
# anchors in a less natural way with a shift relative to the feature grid and
# quantization that results in slightly different sizes for different aspect ratios.
# See also https://github.com/facebookresearch/Detectron/issues/227
anchors = []
for size in sizes:
area = size ** 2.0
for aspect_ratio in aspect_ratios:
# s * s = w * h
# a = h / w
# ... some algebra ...
# w = sqrt(s * s / a)
# h = a * w
w = math.sqrt(area / aspect_ratio)
h = aspect_ratio * w
x0, y0, x1, y1 = -w / 2.0, -h / 2.0, w / 2.0, h / 2.0
anchors.append([x0, y0, x1, y1])
return torch.tensor(anchors)
def forward(self, features: List[torch.Tensor]):
"""
Args:
features (list[Tensor]): list of backbone feature maps on which to generate anchors.
Returns:
list[Boxes]: a list of Boxes containing all the anchors for each feature map
(i.e. the cell anchors repeated over all locations in the feature map).
The number of anchors of each feature map is Hi x Wi x num_cell_anchors,
where Hi, Wi are resolution of the feature map divided by anchor stride.
"""
grid_sizes = [feature_map.shape[-2:] for feature_map in features]
anchors_over_all_feature_maps = self._grid_anchors(grid_sizes)
return [Boxes(x) for x in anchors_over_all_feature_maps]
@ANCHOR_GENERATOR_REGISTRY.register()
class RotatedAnchorGenerator(nn.Module):
"""
Compute rotated anchors used by Rotated RPN (RRPN), described in
"Arbitrary-Oriented Scene Text Detection via Rotation Proposals".
"""
box_dim: int = 5
"""
the dimension of each anchor box.
"""
@configurable
def __init__(self, *, sizes, aspect_ratios, strides, angles, offset=0.5):
"""
This interface is experimental.
Args:
sizes (list[list[float]] or list[float]):
If sizes is list[list[float]], sizes[i] is the list of anchor sizes
(i.e. sqrt of anchor area) to use for the i-th feature map.
If sizes is list[float], the sizes are used for all feature maps.
Anchor sizes are given in absolute lengths in units of
the input image; they do not dynamically scale if the input image size changes.
aspect_ratios (list[list[float]] or list[float]): list of aspect ratios
(i.e. height / width) to use for anchors. Same "broadcast" rule for `sizes` applies.
strides (list[int]): stride of each input feature.
angles (list[list[float]] or list[float]): list of angles (in degrees CCW)
to use for anchors. Same "broadcast" rule for `sizes` applies.
offset (float): Relative offset between the center of the first anchor and the top-left
corner of the image. Value has to be in [0, 1).
Recommend to use 0.5, which means half stride.
"""
super().__init__()
self.strides = strides
self.num_features = len(self.strides)
sizes = _broadcast_params(sizes, self.num_features, "sizes")
aspect_ratios = _broadcast_params(aspect_ratios, self.num_features, "aspect_ratios")
angles = _broadcast_params(angles, self.num_features, "angles")
self.cell_anchors = self._calculate_anchors(sizes, aspect_ratios, angles)
self.offset = offset
assert 0.0 <= self.offset < 1.0, self.offset
@classmethod
def from_config(cls, cfg, input_shape: List[ShapeSpec]):
return {
"sizes": cfg.MODEL.ANCHOR_GENERATOR.SIZES,
"aspect_ratios": cfg.MODEL.ANCHOR_GENERATOR.ASPECT_RATIOS,
"strides": [x.stride for x in input_shape],
"offset": cfg.MODEL.ANCHOR_GENERATOR.OFFSET,
"angles": cfg.MODEL.ANCHOR_GENERATOR.ANGLES,
}
def _calculate_anchors(self, sizes, aspect_ratios, angles):
cell_anchors = [
self.generate_cell_anchors(size, aspect_ratio, angle).float()
for size, aspect_ratio, angle in zip(sizes, aspect_ratios, angles)
]
return BufferList(cell_anchors)
@property
def num_cell_anchors(self):
"""
Alias of `num_anchors`.
"""
return self.num_anchors
@property
def num_anchors(self):
"""
Returns:
list[int]: Each int is the number of anchors at every pixel
location, on that feature map.
For example, if at every pixel we use anchors of 3 aspect
ratios, 2 sizes and 5 angles, the number of anchors is 30.
(See also ANCHOR_GENERATOR.SIZES, ANCHOR_GENERATOR.ASPECT_RATIOS
and ANCHOR_GENERATOR.ANGLES in config)
In standard RRPN models, `num_anchors` on every feature map is the same.
"""
return [len(cell_anchors) for cell_anchors in self.cell_anchors]
def _grid_anchors(self, grid_sizes):
anchors = []
for size, stride, base_anchors in zip(grid_sizes, self.strides, self.cell_anchors):
shift_x, shift_y = _create_grid_offsets(size, stride, self.offset, base_anchors.device)
zeros = torch.zeros_like(shift_x)
shifts = torch.stack((shift_x, shift_y, zeros, zeros, zeros), dim=1)
anchors.append((shifts.view(-1, 1, 5) + base_anchors.view(1, -1, 5)).reshape(-1, 5))
return anchors
def generate_cell_anchors(
self,
sizes=(32, 64, 128, 256, 512),
aspect_ratios=(0.5, 1, 2),
angles=(-90, -60, -30, 0, 30, 60, 90),
):
"""
Generate a tensor storing canonical anchor boxes, which are all anchor
boxes of different sizes, aspect_ratios, angles centered at (0, 0).
We can later build the set of anchors for a full feature map by
shifting and tiling these tensors (see `meth:_grid_anchors`).
Args:
sizes (tuple[float]):
aspect_ratios (tuple[float]]):
angles (tuple[float]]):
Returns:
Tensor of shape (len(sizes) * len(aspect_ratios) * len(angles), 5)
storing anchor boxes in (x_ctr, y_ctr, w, h, angle) format.
"""
anchors = []
for size in sizes:
area = size ** 2.0
for aspect_ratio in aspect_ratios:
# s * s = w * h
# a = h / w
# ... some algebra ...
# w = sqrt(s * s / a)
# h = a * w
w = math.sqrt(area / aspect_ratio)
h = aspect_ratio * w
anchors.extend([0, 0, w, h, a] for a in angles)
return torch.tensor(anchors)
def forward(self, features):
"""
Args:
features (list[Tensor]): list of backbone feature maps on which to generate anchors.
Returns:
list[RotatedBoxes]: a list of Boxes containing all the anchors for each feature map
(i.e. the cell anchors repeated over all locations in the feature map).
The number of anchors of each feature map is Hi x Wi x num_cell_anchors,
where Hi, Wi are resolution of the feature map divided by anchor stride.
"""
grid_sizes = [feature_map.shape[-2:] for feature_map in features]
anchors_over_all_feature_maps = self._grid_anchors(grid_sizes)
return [RotatedBoxes(x) for x in anchors_over_all_feature_maps]
def build_anchor_generator(cfg, input_shape):
"""
Built an anchor generator from `cfg.MODEL.ANCHOR_GENERATOR.NAME`.
"""
anchor_generator = cfg.MODEL.ANCHOR_GENERATOR.NAME
return ANCHOR_GENERATOR_REGISTRY.get(anchor_generator)(cfg, input_shape)
# Copyright (c) Facebook, Inc. and its affiliates.
from .build import build_backbone, BACKBONE_REGISTRY # noqa F401 isort:skip
from .backbone import Backbone
from .fpn import FPN
from .regnet import RegNet
from .resnet import (
BasicStem,
ResNet,
ResNetBlockBase,
build_resnet_backbone,
make_stage,
BottleneckBlock,
)
__all__ = [k for k in globals().keys() if not k.startswith("_")]
# TODO can expose more resnet blocks after careful consideration
# Copyright (c) Facebook, Inc. and its affiliates.
from abc import ABCMeta, abstractmethod
import torch.nn as nn
from detectron2.layers import ShapeSpec
__all__ = ["Backbone"]
class Backbone(nn.Module, metaclass=ABCMeta):
"""
Abstract base class for network backbones.
"""
def __init__(self):
"""
The `__init__` method of any subclass can specify its own set of arguments.
"""
super().__init__()
@abstractmethod
def forward(self):
"""
Subclasses must override this method, but adhere to the same return type.
Returns:
dict[str->Tensor]: mapping from feature name (e.g., "res2") to tensor
"""
pass
@property
def size_divisibility(self) -> int:
"""
Some backbones require the input height and width to be divisible by a
specific integer. This is typically true for encoder / decoder type networks
with lateral connection (e.g., FPN) for which feature maps need to match
dimension in the "bottom up" and "top down" paths. Set to 0 if no specific
input size divisibility is required.
"""
return 0
def output_shape(self):
"""
Returns:
dict[str->ShapeSpec]
"""
# this is a backward-compatible default
return {
name: ShapeSpec(
channels=self._out_feature_channels[name], stride=self._out_feature_strides[name]
)
for name in self._out_features
}
# Copyright (c) Facebook, Inc. and its affiliates.
from detectron2.layers import ShapeSpec
from detectron2.utils.registry import Registry
from .backbone import Backbone
BACKBONE_REGISTRY = Registry("BACKBONE")
BACKBONE_REGISTRY.__doc__ = """
Registry for backbones, which extract feature maps from images
The registered object must be a callable that accepts two arguments:
1. A :class:`detectron2.config.CfgNode`
2. A :class:`detectron2.layers.ShapeSpec`, which contains the input shape specification.
Registered object must return instance of :class:`Backbone`.
"""
def build_backbone(cfg, input_shape=None):
"""
Build a backbone from `cfg.MODEL.BACKBONE.NAME`.
Returns:
an instance of :class:`Backbone`
"""
if input_shape is None:
input_shape = ShapeSpec(channels=len(cfg.MODEL.PIXEL_MEAN))
backbone_name = cfg.MODEL.BACKBONE.NAME
backbone = BACKBONE_REGISTRY.get(backbone_name)(cfg, input_shape)
assert isinstance(backbone, Backbone)
return backbone
# Copyright (c) Facebook, Inc. and its affiliates.
import math
import fvcore.nn.weight_init as weight_init
import torch
import torch.nn.functional as F
from torch import nn
from detectron2.layers import Conv2d, ShapeSpec, get_norm
from .backbone import Backbone
from .build import BACKBONE_REGISTRY
from .resnet import build_resnet_backbone
__all__ = ["build_resnet_fpn_backbone", "build_retinanet_resnet_fpn_backbone", "FPN"]
class FPN(Backbone):
"""
This module implements :paper:`FPN`.
It creates pyramid features built on top of some input feature maps.
"""
_fuse_type: torch.jit.Final[str]
def __init__(
self, bottom_up, in_features, out_channels, norm="", top_block=None, fuse_type="sum"
):
"""
Args:
bottom_up (Backbone): module representing the bottom up subnetwork.
Must be a subclass of :class:`Backbone`. The multi-scale feature
maps generated by the bottom up network, and listed in `in_features`,
are used to generate FPN levels.
in_features (list[str]): names of the input feature maps coming
from the backbone to which FPN is attached. For example, if the
backbone produces ["res2", "res3", "res4"], any *contiguous* sublist
of these may be used; order must be from high to low resolution.
out_channels (int): number of channels in the output feature maps.
norm (str): the normalization to use.
top_block (nn.Module or None): if provided, an extra operation will
be performed on the output of the last (smallest resolution)
FPN output, and the result will extend the result list. The top_block
further downsamples the feature map. It must have an attribute
"num_levels", meaning the number of extra FPN levels added by
this block, and "in_feature", which is a string representing
its input feature (e.g., p5).
fuse_type (str): types for fusing the top down features and the lateral
ones. It can be "sum" (default), which sums up element-wise; or "avg",
which takes the element-wise mean of the two.
"""
super(FPN, self).__init__()
assert isinstance(bottom_up, Backbone)
assert in_features, in_features
# Feature map strides and channels from the bottom up network (e.g. ResNet)
input_shapes = bottom_up.output_shape()
strides = [input_shapes[f].stride for f in in_features]
in_channels_per_feature = [input_shapes[f].channels for f in in_features]
_assert_strides_are_log2_contiguous(strides)
lateral_convs = []
output_convs = []
use_bias = norm == ""
for idx, in_channels in enumerate(in_channels_per_feature):
lateral_norm = get_norm(norm, out_channels)
output_norm = get_norm(norm, out_channels)
lateral_conv = Conv2d(
in_channels, out_channels, kernel_size=1, bias=use_bias, norm=lateral_norm
)
output_conv = Conv2d(
out_channels,
out_channels,
kernel_size=3,
stride=1,
padding=1,
bias=use_bias,
norm=output_norm,
)
weight_init.c2_xavier_fill(lateral_conv)
weight_init.c2_xavier_fill(output_conv)
stage = int(math.log2(strides[idx]))
self.add_module("fpn_lateral{}".format(stage), lateral_conv)
self.add_module("fpn_output{}".format(stage), output_conv)
lateral_convs.append(lateral_conv)
output_convs.append(output_conv)
# Place convs into top-down order (from low to high resolution)
# to make the top-down computation in forward clearer.
self.lateral_convs = lateral_convs[::-1]
self.output_convs = output_convs[::-1]
self.top_block = top_block
self.in_features = tuple(in_features)
self.bottom_up = bottom_up
# Return feature names are "p<stage>", like ["p2", "p3", ..., "p6"]
self._out_feature_strides = {"p{}".format(int(math.log2(s))): s for s in strides}
# top block output feature maps.
if self.top_block is not None:
for s in range(stage, stage + self.top_block.num_levels):
self._out_feature_strides["p{}".format(s + 1)] = 2 ** (s + 1)
self._out_features = list(self._out_feature_strides.keys())
self._out_feature_channels = {k: out_channels for k in self._out_features}
self._size_divisibility = strides[-1]
assert fuse_type in {"avg", "sum"}
self._fuse_type = fuse_type
@property
def size_divisibility(self):
return self._size_divisibility
def forward(self, x):
"""
Args:
input (dict[str->Tensor]): mapping feature map name (e.g., "res5") to
feature map tensor for each feature level in high to low resolution order.
Returns:
dict[str->Tensor]:
mapping from feature map name to FPN feature map tensor
in high to low resolution order. Returned feature names follow the FPN
paper convention: "p<stage>", where stage has stride = 2 ** stage e.g.,
["p2", "p3", ..., "p6"].
"""
bottom_up_features = self.bottom_up(x)
results = []
prev_features = self.lateral_convs[0](bottom_up_features[self.in_features[-1]])
results.append(self.output_convs[0](prev_features))
# Reverse feature maps into top-down order (from low to high resolution)
for idx, (lateral_conv, output_conv) in enumerate(
zip(self.lateral_convs, self.output_convs)
):
# Slicing of ModuleList is not supported https://github.com/pytorch/pytorch/issues/47336
# Therefore we loop over all modules but skip the first one
if idx > 0:
features = self.in_features[-idx - 1]
features = bottom_up_features[features]
top_down_features = F.interpolate(prev_features, scale_factor=2.0, mode="nearest")
lateral_features = lateral_conv(features)
prev_features = lateral_features + top_down_features
if self._fuse_type == "avg":
prev_features /= 2
results.insert(0, output_conv(prev_features))
if self.top_block is not None:
if self.top_block.in_feature in bottom_up_features:
top_block_in_feature = bottom_up_features[self.top_block.in_feature]
else:
top_block_in_feature = results[self._out_features.index(self.top_block.in_feature)]
results.extend(self.top_block(top_block_in_feature))
assert len(self._out_features) == len(results)
return {f: res for f, res in zip(self._out_features, results)}
def output_shape(self):
return {
name: ShapeSpec(
channels=self._out_feature_channels[name], stride=self._out_feature_strides[name]
)
for name in self._out_features
}
def _assert_strides_are_log2_contiguous(strides):
"""
Assert that each stride is 2x times its preceding stride, i.e. "contiguous in log2".
"""
for i, stride in enumerate(strides[1:], 1):
assert stride == 2 * strides[i - 1], "Strides {} {} are not log2 contiguous".format(
stride, strides[i - 1]
)
class LastLevelMaxPool(nn.Module):
"""
This module is used in the original FPN to generate a downsampled
P6 feature from P5.
"""
def __init__(self):
super().__init__()
self.num_levels = 1
self.in_feature = "p5"
def forward(self, x):
return [F.max_pool2d(x, kernel_size=1, stride=2, padding=0)]
class LastLevelP6P7(nn.Module):
"""
This module is used in RetinaNet to generate extra layers, P6 and P7 from
C5 feature.
"""
def __init__(self, in_channels, out_channels, in_feature="res5"):
super().__init__()
self.num_levels = 2
self.in_feature = in_feature
self.p6 = nn.Conv2d(in_channels, out_channels, 3, 2, 1)
self.p7 = nn.Conv2d(out_channels, out_channels, 3, 2, 1)
for module in [self.p6, self.p7]:
weight_init.c2_xavier_fill(module)
def forward(self, c5):
p6 = self.p6(c5)
p7 = self.p7(F.relu(p6))
return [p6, p7]
@BACKBONE_REGISTRY.register()
def build_resnet_fpn_backbone(cfg, input_shape: ShapeSpec):
"""
Args:
cfg: a detectron2 CfgNode
Returns:
backbone (Backbone): backbone module, must be a subclass of :class:`Backbone`.
"""
bottom_up = build_resnet_backbone(cfg, input_shape)
in_features = cfg.MODEL.FPN.IN_FEATURES
out_channels = cfg.MODEL.FPN.OUT_CHANNELS
backbone = FPN(
bottom_up=bottom_up,
in_features=in_features,
out_channels=out_channels,
norm=cfg.MODEL.FPN.NORM,
top_block=LastLevelMaxPool(),
fuse_type=cfg.MODEL.FPN.FUSE_TYPE,
)
return backbone
@BACKBONE_REGISTRY.register()
def build_retinanet_resnet_fpn_backbone(cfg, input_shape: ShapeSpec):
"""
Args:
cfg: a detectron2 CfgNode
Returns:
backbone (Backbone): backbone module, must be a subclass of :class:`Backbone`.
"""
bottom_up = build_resnet_backbone(cfg, input_shape)
in_features = cfg.MODEL.FPN.IN_FEATURES
out_channels = cfg.MODEL.FPN.OUT_CHANNELS
in_channels_p6p7 = bottom_up.output_shape()["res5"].channels
backbone = FPN(
bottom_up=bottom_up,
in_features=in_features,
out_channels=out_channels,
norm=cfg.MODEL.FPN.NORM,
top_block=LastLevelP6P7(in_channels_p6p7, out_channels),
fuse_type=cfg.MODEL.FPN.FUSE_TYPE,
)
return backbone
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
"""
Implementation of RegNet models from :paper:`dds` and :paper:`scaling`.
This code is adapted from https://github.com/facebookresearch/pycls with minimal modifications.
Some code duplication exists between RegNet and ResNets (e.g., ResStem) in order to simplify
model loading.
"""
import numpy as np
from torch import nn
from detectron2.layers import CNNBlockBase, ShapeSpec, get_norm
from .backbone import Backbone
__all__ = [
"AnyNet",
"RegNet",
"ResStem",
"SimpleStem",
"VanillaBlock",
"ResBasicBlock",
"ResBottleneckBlock",
]
def conv2d(w_in, w_out, k, *, stride=1, groups=1, bias=False):
"""Helper for building a conv2d layer."""
assert k % 2 == 1, "Only odd size kernels supported to avoid padding issues."
s, p, g, b = stride, (k - 1) // 2, groups, bias
return nn.Conv2d(w_in, w_out, k, stride=s, padding=p, groups=g, bias=b)
def gap2d():
"""Helper for building a global average pooling layer."""
return nn.AdaptiveAvgPool2d((1, 1))
def pool2d(k, *, stride=1):
"""Helper for building a pool2d layer."""
assert k % 2 == 1, "Only odd size kernels supported to avoid padding issues."
return nn.MaxPool2d(k, stride=stride, padding=(k - 1) // 2)
def init_weights(m):
"""Performs ResNet-style weight initialization."""
if isinstance(m, nn.Conv2d):
# Note that there is no bias due to BN
fan_out = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data.normal_(mean=0.0, std=np.sqrt(2.0 / fan_out))
elif isinstance(m, nn.BatchNorm2d):
m.weight.data.fill_(1.0)
m.bias.data.zero_()
elif isinstance(m, nn.Linear):
m.weight.data.normal_(mean=0.0, std=0.01)
m.bias.data.zero_()
class ResStem(CNNBlockBase):
"""ResNet stem for ImageNet: 7x7, BN, AF, MaxPool."""
def __init__(self, w_in, w_out, norm, activation_class):
super().__init__(w_in, w_out, 4)
self.conv = conv2d(w_in, w_out, 7, stride=2)
self.bn = get_norm(norm, w_out)
self.af = activation_class()
self.pool = pool2d(3, stride=2)
def forward(self, x):
for layer in self.children():
x = layer(x)
return x
class SimpleStem(CNNBlockBase):
"""Simple stem for ImageNet: 3x3, BN, AF."""
def __init__(self, w_in, w_out, norm, activation_class):
super().__init__(w_in, w_out, 2)
self.conv = conv2d(w_in, w_out, 3, stride=2)
self.bn = get_norm(norm, w_out)
self.af = activation_class()
def forward(self, x):
for layer in self.children():
x = layer(x)
return x
class SE(nn.Module):
"""Squeeze-and-Excitation (SE) block: AvgPool, FC, Act, FC, Sigmoid."""
def __init__(self, w_in, w_se, activation_class):
super().__init__()
self.avg_pool = gap2d()
self.f_ex = nn.Sequential(
conv2d(w_in, w_se, 1, bias=True),
activation_class(),
conv2d(w_se, w_in, 1, bias=True),
nn.Sigmoid(),
)
def forward(self, x):
return x * self.f_ex(self.avg_pool(x))
class VanillaBlock(CNNBlockBase):
"""Vanilla block: [3x3 conv, BN, Relu] x2."""
def __init__(self, w_in, w_out, stride, norm, activation_class, _params):
super().__init__(w_in, w_out, stride)
self.a = conv2d(w_in, w_out, 3, stride=stride)
self.a_bn = get_norm(norm, w_out)
self.a_af = activation_class()
self.b = conv2d(w_out, w_out, 3)
self.b_bn = get_norm(norm, w_out)
self.b_af = activation_class()
def forward(self, x):
for layer in self.children():
x = layer(x)
return x
class BasicTransform(nn.Module):
"""Basic transformation: [3x3 conv, BN, Relu] x2."""
def __init__(self, w_in, w_out, stride, norm, activation_class, _params):
super().__init__()
self.a = conv2d(w_in, w_out, 3, stride=stride)
self.a_bn = get_norm(norm, w_out)
self.a_af = activation_class()
self.b = conv2d(w_out, w_out, 3)
self.b_bn = get_norm(norm, w_out)
self.b_bn.final_bn = True
def forward(self, x):
for layer in self.children():
x = layer(x)
return x
class ResBasicBlock(CNNBlockBase):
"""Residual basic block: x + f(x), f = basic transform."""
def __init__(self, w_in, w_out, stride, norm, activation_class, params):
super().__init__(w_in, w_out, stride)
self.proj, self.bn = None, None
if (w_in != w_out) or (stride != 1):
self.proj = conv2d(w_in, w_out, 1, stride=stride)
self.bn = get_norm(norm, w_out)
self.f = BasicTransform(w_in, w_out, stride, norm, activation_class, params)
self.af = activation_class()
def forward(self, x):
x_p = self.bn(self.proj(x)) if self.proj else x
return self.af(x_p + self.f(x))
class BottleneckTransform(nn.Module):
"""Bottleneck transformation: 1x1, 3x3 [+SE], 1x1."""
def __init__(self, w_in, w_out, stride, norm, activation_class, params):
super().__init__()
w_b = int(round(w_out * params["bot_mul"]))
w_se = int(round(w_in * params["se_r"]))
groups = w_b // params["group_w"]
self.a = conv2d(w_in, w_b, 1)
self.a_bn = get_norm(norm, w_b)
self.a_af = activation_class()
self.b = conv2d(w_b, w_b, 3, stride=stride, groups=groups)
self.b_bn = get_norm(norm, w_b)
self.b_af = activation_class()
self.se = SE(w_b, w_se, activation_class) if w_se else None
self.c = conv2d(w_b, w_out, 1)
self.c_bn = get_norm(norm, w_out)
self.c_bn.final_bn = True
def forward(self, x):
for layer in self.children():
x = layer(x)
return x
class ResBottleneckBlock(CNNBlockBase):
"""Residual bottleneck block: x + f(x), f = bottleneck transform."""
def __init__(self, w_in, w_out, stride, norm, activation_class, params):
super().__init__(w_in, w_out, stride)
self.proj, self.bn = None, None
if (w_in != w_out) or (stride != 1):
self.proj = conv2d(w_in, w_out, 1, stride=stride)
self.bn = get_norm(norm, w_out)
self.f = BottleneckTransform(w_in, w_out, stride, norm, activation_class, params)
self.af = activation_class()
def forward(self, x):
x_p = self.bn(self.proj(x)) if self.proj else x
return self.af(x_p + self.f(x))
class AnyStage(nn.Module):
"""AnyNet stage (sequence of blocks w/ the same output shape)."""
def __init__(self, w_in, w_out, stride, d, block_class, norm, activation_class, params):
super().__init__()
for i in range(d):
block = block_class(w_in, w_out, stride, norm, activation_class, params)
self.add_module("b{}".format(i + 1), block)
stride, w_in = 1, w_out
def forward(self, x):
for block in self.children():
x = block(x)
return x
class AnyNet(Backbone):
"""AnyNet model. See :paper:`dds`."""
def __init__(
self,
*,
stem_class,
stem_width,
block_class,
depths,
widths,
group_widths,
strides,
bottleneck_ratios,
se_ratio,
activation_class,
freeze_at=0,
norm="BN",
out_features=None,
):
"""
Args:
stem_class (callable): A callable taking 4 arguments (channels in, channels out,
normalization, callable returning an activation function) that returns another
callable implementing the stem module.
stem_width (int): The number of output channels that the stem produces.
block_class (callable): A callable taking 6 arguments (channels in, channels out,
stride, normalization, callable returning an activation function, a dict of
block-specific parameters) that returns another callable implementing the repeated
block module.
depths (list[int]): Number of blocks in each stage.
widths (list[int]): For each stage, the number of output channels of each block.
group_widths (list[int]): For each stage, the number of channels per group in group
convolution, if the block uses group convolution.
strides (list[int]): The stride that each network stage applies to its input.
bottleneck_ratios (list[float]): For each stage, the ratio of the number of bottleneck
channels to the number of block input channels (or, equivalently, output channels),
if the block uses a bottleneck.
se_ratio (float): The ratio of the number of channels used inside the squeeze-excitation
(SE) module to it number of input channels, if SE the block uses SE.
activation_class (callable): A callable taking no arguments that returns another
callable implementing an activation function.
freeze_at (int): The number of stages at the beginning to freeze.
see :meth:`freeze` for detailed explanation.
norm (str or callable): normalization for all conv layers.
See :func:`layers.get_norm` for supported format.
out_features (list[str]): name of the layers whose outputs should
be returned in forward. RegNet's use "stem" and "s1", "s2", etc for the stages after
the stem. If None, will return the output of the last layer.
"""
super().__init__()
self.stem = stem_class(3, stem_width, norm, activation_class)
current_stride = self.stem.stride
self._out_feature_strides = {"stem": current_stride}
self._out_feature_channels = {"stem": self.stem.out_channels}
self.stages_and_names = []
prev_w = stem_width
for i, (d, w, s, b, g) in enumerate(
zip(depths, widths, strides, bottleneck_ratios, group_widths)
):
params = {"bot_mul": b, "group_w": g, "se_r": se_ratio}
stage = AnyStage(prev_w, w, s, d, block_class, norm, activation_class, params)
name = "s{}".format(i + 1)
self.add_module(name, stage)
self.stages_and_names.append((stage, name))
self._out_feature_strides[name] = current_stride = int(
current_stride * np.prod([k.stride for k in stage.children()])
)
self._out_feature_channels[name] = list(stage.children())[-1].out_channels
prev_w = w
self.apply(init_weights)
if out_features is None:
out_features = [name]
self._out_features = out_features
assert len(self._out_features)
children = [x[0] for x in self.named_children()]
for out_feature in self._out_features:
assert out_feature in children, "Available children: {} does not include {}".format(
", ".join(children), out_feature
)
self.freeze(freeze_at)
def forward(self, x):
"""
Args:
x: Tensor of shape (N,C,H,W). H, W must be a multiple of ``self.size_divisibility``.
Returns:
dict[str->Tensor]: names and the corresponding features
"""
assert x.dim() == 4, f"Model takes an input of shape (N, C, H, W). Got {x.shape} instead!"
outputs = {}
x = self.stem(x)
if "stem" in self._out_features:
outputs["stem"] = x
for stage, name in self.stages_and_names:
x = stage(x)
if name in self._out_features:
outputs[name] = x
return outputs
def output_shape(self):
return {
name: ShapeSpec(
channels=self._out_feature_channels[name], stride=self._out_feature_strides[name]
)
for name in self._out_features
}
def freeze(self, freeze_at=0):
"""
Freeze the first several stages of the model. Commonly used in fine-tuning.
Layers that produce the same feature map spatial size are defined as one
"stage" by :paper:`FPN`.
Args:
freeze_at (int): number of stages to freeze.
`1` means freezing the stem. `2` means freezing the stem and
one residual stage, etc.
Returns:
nn.Module: this model itself
"""
if freeze_at >= 1:
self.stem.freeze()
for idx, (stage, _) in enumerate(self.stages_and_names, start=2):
if freeze_at >= idx:
for block in stage.children():
block.freeze()
return self
def adjust_block_compatibility(ws, bs, gs):
"""Adjusts the compatibility of widths, bottlenecks, and groups."""
assert len(ws) == len(bs) == len(gs)
assert all(w > 0 and b > 0 and g > 0 for w, b, g in zip(ws, bs, gs))
vs = [int(max(1, w * b)) for w, b in zip(ws, bs)]
gs = [int(min(g, v)) for g, v in zip(gs, vs)]
ms = [np.lcm(g, b) if b > 1 else g for g, b in zip(gs, bs)]
vs = [max(m, int(round(v / m) * m)) for v, m in zip(vs, ms)]
ws = [int(v / b) for v, b in zip(vs, bs)]
assert all(w * b % g == 0 for w, b, g in zip(ws, bs, gs))
return ws, bs, gs
def generate_regnet_parameters(w_a, w_0, w_m, d, q=8):
"""Generates per stage widths and depths from RegNet parameters."""
assert w_a >= 0 and w_0 > 0 and w_m > 1 and w_0 % q == 0
# Generate continuous per-block ws
ws_cont = np.arange(d) * w_a + w_0
# Generate quantized per-block ws
ks = np.round(np.log(ws_cont / w_0) / np.log(w_m))
ws_all = w_0 * np.power(w_m, ks)
ws_all = np.round(np.divide(ws_all, q)).astype(int) * q
# Generate per stage ws and ds (assumes ws_all are sorted)
ws, ds = np.unique(ws_all, return_counts=True)
# Compute number of actual stages and total possible stages
num_stages, total_stages = len(ws), ks.max() + 1
# Convert numpy arrays to lists and return
ws, ds, ws_all, ws_cont = (x.tolist() for x in (ws, ds, ws_all, ws_cont))
return ws, ds, num_stages, total_stages, ws_all, ws_cont
class RegNet(AnyNet):
"""RegNet model. See :paper:`dds`."""
def __init__(
self,
*,
stem_class,
stem_width,
block_class,
depth,
w_a,
w_0,
w_m,
group_width,
stride=2,
bottleneck_ratio=1.0,
se_ratio=0.0,
activation_class=None,
freeze_at=0,
norm="BN",
out_features=None,
):
"""
Build a RegNet from the parameterization described in :paper:`dds` Section 3.3.
Args:
See :class:`AnyNet` for arguments that are not listed here.
depth (int): Total number of blocks in the RegNet.
w_a (float): Factor by which block width would increase prior to quantizing block widths
by stage. See :paper:`dds` Section 3.3.
w_0 (int): Initial block width. See :paper:`dds` Section 3.3.
w_m (float): Parameter controlling block width quantization.
See :paper:`dds` Section 3.3.
group_width (int): Number of channels per group in group convolution, if the block uses
group convolution.
bottleneck_ratio (float): The ratio of the number of bottleneck channels to the number
of block input channels (or, equivalently, output channels), if the block uses a
bottleneck.
stride (int): The stride that each network stage applies to its input.
"""
ws, ds = generate_regnet_parameters(w_a, w_0, w_m, depth)[0:2]
ss = [stride for _ in ws]
bs = [bottleneck_ratio for _ in ws]
gs = [group_width for _ in ws]
ws, bs, gs = adjust_block_compatibility(ws, bs, gs)
def default_activation_class():
return nn.ReLU(inplace=True)
super().__init__(
stem_class=stem_class,
stem_width=stem_width,
block_class=block_class,
depths=ds,
widths=ws,
strides=ss,
group_widths=gs,
bottleneck_ratios=bs,
se_ratio=se_ratio,
activation_class=default_activation_class
if activation_class is None
else activation_class,
freeze_at=freeze_at,
norm=norm,
out_features=out_features,
)
# Copyright (c) Facebook, Inc. and its affiliates.
import numpy as np
import fvcore.nn.weight_init as weight_init
import torch
import torch.nn.functional as F
from torch import nn
from detectron2.layers import (
CNNBlockBase,
Conv2d,
DeformConv,
ModulatedDeformConv,
ShapeSpec,
get_norm,
)
from .backbone import Backbone
from .build import BACKBONE_REGISTRY
__all__ = [
"ResNetBlockBase",
"BasicBlock",
"BottleneckBlock",
"DeformBottleneckBlock",
"BasicStem",
"ResNet",
"make_stage",
"build_resnet_backbone",
]
class BasicBlock(CNNBlockBase):
"""
The basic residual block for ResNet-18 and ResNet-34 defined in :paper:`ResNet`,
with two 3x3 conv layers and a projection shortcut if needed.
"""
def __init__(self, in_channels, out_channels, *, stride=1, norm="BN"):
"""
Args:
in_channels (int): Number of input channels.
out_channels (int): Number of output channels.
stride (int): Stride for the first conv.
norm (str or callable): normalization for all conv layers.
See :func:`layers.get_norm` for supported format.
"""
super().__init__(in_channels, out_channels, stride)
if in_channels != out_channels:
self.shortcut = Conv2d(
in_channels,
out_channels,
kernel_size=1,
stride=stride,
bias=False,
norm=get_norm(norm, out_channels),
)
else:
self.shortcut = None
self.conv1 = Conv2d(
in_channels,
out_channels,
kernel_size=3,
stride=stride,
padding=1,
bias=False,
norm=get_norm(norm, out_channels),
)
self.conv2 = Conv2d(
out_channels,
out_channels,
kernel_size=3,
stride=1,
padding=1,
bias=False,
norm=get_norm(norm, out_channels),
)
for layer in [self.conv1, self.conv2, self.shortcut]:
if layer is not None: # shortcut can be None
weight_init.c2_msra_fill(layer)
def forward(self, x):
out = self.conv1(x)
out = F.relu_(out)
out = self.conv2(out)
if self.shortcut is not None:
shortcut = self.shortcut(x)
else:
shortcut = x
out += shortcut
out = F.relu_(out)
return out
class BottleneckBlock(CNNBlockBase):
"""
The standard bottleneck residual block used by ResNet-50, 101 and 152
defined in :paper:`ResNet`. It contains 3 conv layers with kernels
1x1, 3x3, 1x1, and a projection shortcut if needed.
"""
def __init__(
self,
in_channels,
out_channels,
*,
bottleneck_channels,
stride=1,
num_groups=1,
norm="BN",
stride_in_1x1=False,
dilation=1,
):
"""
Args:
bottleneck_channels (int): number of output channels for the 3x3
"bottleneck" conv layers.
num_groups (int): number of groups for the 3x3 conv layer.
norm (str or callable): normalization for all conv layers.
See :func:`layers.get_norm` for supported format.
stride_in_1x1 (bool): when stride>1, whether to put stride in the
first 1x1 convolution or the bottleneck 3x3 convolution.
dilation (int): the dilation rate of the 3x3 conv layer.
"""
super().__init__(in_channels, out_channels, stride)
if in_channels != out_channels:
self.shortcut = Conv2d(
in_channels,
out_channels,
kernel_size=1,
stride=stride,
bias=False,
norm=get_norm(norm, out_channels),
)
else:
self.shortcut = None
# The original MSRA ResNet models have stride in the first 1x1 conv
# The subsequent fb.torch.resnet and Caffe2 ResNe[X]t implementations have
# stride in the 3x3 conv
stride_1x1, stride_3x3 = (stride, 1) if stride_in_1x1 else (1, stride)
self.conv1 = Conv2d(
in_channels,
bottleneck_channels,
kernel_size=1,
stride=stride_1x1,
bias=False,
norm=get_norm(norm, bottleneck_channels),
)
self.conv2 = Conv2d(
bottleneck_channels,
bottleneck_channels,
kernel_size=3,
stride=stride_3x3,
padding=1 * dilation,
bias=False,
groups=num_groups,
dilation=dilation,
norm=get_norm(norm, bottleneck_channels),
)
self.conv3 = Conv2d(
bottleneck_channels,
out_channels,
kernel_size=1,
bias=False,
norm=get_norm(norm, out_channels),
)
for layer in [self.conv1, self.conv2, self.conv3, self.shortcut]:
if layer is not None: # shortcut can be None
weight_init.c2_msra_fill(layer)
# Zero-initialize the last normalization in each residual branch,
# so that at the beginning, the residual branch starts with zeros,
# and each residual block behaves like an identity.
# See Sec 5.1 in "Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour":
# "For BN layers, the learnable scaling coefficient γ is initialized
# to be 1, except for each residual block's last BN
# where γ is initialized to be 0."
# nn.init.constant_(self.conv3.norm.weight, 0)
# TODO this somehow hurts performance when training GN models from scratch.
# Add it as an option when we need to use this code to train a backbone.
def forward(self, x):
out = self.conv1(x)
out = F.relu_(out)
out = self.conv2(out)
out = F.relu_(out)
out = self.conv3(out)
if self.shortcut is not None:
shortcut = self.shortcut(x)
else:
shortcut = x
out += shortcut
out = F.relu_(out)
return out
class DeformBottleneckBlock(CNNBlockBase):
"""
Similar to :class:`BottleneckBlock`, but with :paper:`deformable conv <deformconv>`
in the 3x3 convolution.
"""
def __init__(
self,
in_channels,
out_channels,
*,
bottleneck_channels,
stride=1,
num_groups=1,
norm="BN",
stride_in_1x1=False,
dilation=1,
deform_modulated=False,
deform_num_groups=1,
):
super().__init__(in_channels, out_channels, stride)
self.deform_modulated = deform_modulated
if in_channels != out_channels:
self.shortcut = Conv2d(
in_channels,
out_channels,
kernel_size=1,
stride=stride,
bias=False,
norm=get_norm(norm, out_channels),
)
else:
self.shortcut = None
stride_1x1, stride_3x3 = (stride, 1) if stride_in_1x1 else (1, stride)
self.conv1 = Conv2d(
in_channels,
bottleneck_channels,
kernel_size=1,
stride=stride_1x1,
bias=False,
norm=get_norm(norm, bottleneck_channels),
)
if deform_modulated:
deform_conv_op = ModulatedDeformConv
# offset channels are 2 or 3 (if with modulated) * kernel_size * kernel_size
offset_channels = 27
else:
deform_conv_op = DeformConv
offset_channels = 18
self.conv2_offset = Conv2d(
bottleneck_channels,
offset_channels * deform_num_groups,
kernel_size=3,
stride=stride_3x3,
padding=1 * dilation,
dilation=dilation,
)
self.conv2 = deform_conv_op(
bottleneck_channels,
bottleneck_channels,
kernel_size=3,
stride=stride_3x3,
padding=1 * dilation,
bias=False,
groups=num_groups,
dilation=dilation,
deformable_groups=deform_num_groups,
norm=get_norm(norm, bottleneck_channels),
)
self.conv3 = Conv2d(
bottleneck_channels,
out_channels,
kernel_size=1,
bias=False,
norm=get_norm(norm, out_channels),
)
for layer in [self.conv1, self.conv2, self.conv3, self.shortcut]:
if layer is not None: # shortcut can be None
weight_init.c2_msra_fill(layer)
nn.init.constant_(self.conv2_offset.weight, 0)
nn.init.constant_(self.conv2_offset.bias, 0)
def forward(self, x):
out = self.conv1(x)
out = F.relu_(out)
if self.deform_modulated:
offset_mask = self.conv2_offset(out)
offset_x, offset_y, mask = torch.chunk(offset_mask, 3, dim=1)
offset = torch.cat((offset_x, offset_y), dim=1)
mask = mask.sigmoid()
out = self.conv2(out, offset, mask)
else:
offset = self.conv2_offset(out)
out = self.conv2(out, offset)
out = F.relu_(out)
out = self.conv3(out)
if self.shortcut is not None:
shortcut = self.shortcut(x)
else:
shortcut = x
out += shortcut
out = F.relu_(out)
return out
class BasicStem(CNNBlockBase):
"""
The standard ResNet stem (layers before the first residual block),
with a conv, relu and max_pool.
"""
def __init__(self, in_channels=3, out_channels=64, norm="BN"):
"""
Args:
norm (str or callable): norm after the first conv layer.
See :func:`layers.get_norm` for supported format.
"""
super().__init__(in_channels, out_channels, 4)
self.in_channels = in_channels
self.conv1 = Conv2d(
in_channels,
out_channels,
kernel_size=7,
stride=2,
padding=3,
bias=False,
norm=get_norm(norm, out_channels),
)
weight_init.c2_msra_fill(self.conv1)
def forward(self, x):
x = self.conv1(x)
x = F.relu_(x)
x = F.max_pool2d(x, kernel_size=3, stride=2, padding=1)
return x
class ResNet(Backbone):
"""
Implement :paper:`ResNet`.
"""
def __init__(self, stem, stages, num_classes=None, out_features=None, freeze_at=0):
"""
Args:
stem (nn.Module): a stem module
stages (list[list[CNNBlockBase]]): several (typically 4) stages,
each contains multiple :class:`CNNBlockBase`.
num_classes (None or int): if None, will not perform classification.
Otherwise, will create a linear layer.
out_features (list[str]): name of the layers whose outputs should
be returned in forward. Can be anything in "stem", "linear", or "res2" ...
If None, will return the output of the last layer.
freeze_at (int): The number of stages at the beginning to freeze.
see :meth:`freeze` for detailed explanation.
"""
super().__init__()
self.stem = stem
self.num_classes = num_classes
current_stride = self.stem.stride
self._out_feature_strides = {"stem": current_stride}
self._out_feature_channels = {"stem": self.stem.out_channels}
self.stage_names, self.stages = [], []
if out_features is not None:
# Avoid keeping unused layers in this module. They consume extra memory
# and may cause allreduce to fail
num_stages = max(
[{"res2": 1, "res3": 2, "res4": 3, "res5": 4}.get(f, 0) for f in out_features]
)
stages = stages[:num_stages]
for i, blocks in enumerate(stages):
assert len(blocks) > 0, len(blocks)
for block in blocks:
assert isinstance(block, CNNBlockBase), block
name = "res" + str(i + 2)
stage = nn.Sequential(*blocks)
self.add_module(name, stage)
self.stage_names.append(name)
self.stages.append(stage)
self._out_feature_strides[name] = current_stride = int(
current_stride * np.prod([k.stride for k in blocks])
)
self._out_feature_channels[name] = curr_channels = blocks[-1].out_channels
self.stage_names = tuple(self.stage_names) # Make it static for scripting
if num_classes is not None:
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.linear = nn.Linear(curr_channels, num_classes)
# Sec 5.1 in "Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour":
# "The 1000-way fully-connected layer is initialized by
# drawing weights from a zero-mean Gaussian with standard deviation of 0.01."
nn.init.normal_(self.linear.weight, std=0.01)
name = "linear"
if out_features is None:
out_features = [name]
self._out_features = out_features
assert len(self._out_features)
children = [x[0] for x in self.named_children()]
for out_feature in self._out_features:
assert out_feature in children, "Available children: {}".format(", ".join(children))
self.freeze(freeze_at)
def forward(self, x):
"""
Args:
x: Tensor of shape (N,C,H,W). H, W must be a multiple of ``self.size_divisibility``.
Returns:
dict[str->Tensor]: names and the corresponding features
"""
assert x.dim() == 4, f"ResNet takes an input of shape (N, C, H, W). Got {x.shape} instead!"
outputs = {}
x = self.stem(x)
if "stem" in self._out_features:
outputs["stem"] = x
for name, stage in zip(self.stage_names, self.stages):
x = stage(x)
if name in self._out_features:
outputs[name] = x
if self.num_classes is not None:
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.linear(x)
if "linear" in self._out_features:
outputs["linear"] = x
return outputs
def output_shape(self):
return {
name: ShapeSpec(
channels=self._out_feature_channels[name], stride=self._out_feature_strides[name]
)
for name in self._out_features
}
def freeze(self, freeze_at=0):
"""
Freeze the first several stages of the ResNet. Commonly used in
fine-tuning.
Layers that produce the same feature map spatial size are defined as one
"stage" by :paper:`FPN`.
Args:
freeze_at (int): number of stages to freeze.
`1` means freezing the stem. `2` means freezing the stem and
one residual stage, etc.
Returns:
nn.Module: this ResNet itself
"""
if freeze_at >= 1:
self.stem.freeze()
for idx, stage in enumerate(self.stages, start=2):
if freeze_at >= idx:
for block in stage.children():
block.freeze()
return self
@staticmethod
def make_stage(block_class, num_blocks, *, in_channels, out_channels, **kwargs):
"""
Create a list of blocks of the same type that forms one ResNet stage.
Args:
block_class (type): a subclass of CNNBlockBase that's used to create all blocks in this
stage. A module of this type must not change spatial resolution of inputs unless its
stride != 1.
num_blocks (int): number of blocks in this stage
in_channels (int): input channels of the entire stage.
out_channels (int): output channels of **every block** in the stage.
kwargs: other arguments passed to the constructor of
`block_class`. If the argument name is "xx_per_block", the
argument is a list of values to be passed to each block in the
stage. Otherwise, the same argument is passed to every block
in the stage.
Returns:
list[CNNBlockBase]: a list of block module.
Examples:
::
stage = ResNet.make_stage(
BottleneckBlock, 3, in_channels=16, out_channels=64,
bottleneck_channels=16, num_groups=1,
stride_per_block=[2, 1, 1],
dilations_per_block=[1, 1, 2]
)
Usually, layers that produce the same feature map spatial size are defined as one
"stage" (in :paper:`FPN`). Under such definition, ``stride_per_block[1:]`` should
all be 1.
"""
blocks = []
for i in range(num_blocks):
curr_kwargs = {}
for k, v in kwargs.items():
if k.endswith("_per_block"):
assert len(v) == num_blocks, (
f"Argument '{k}' of make_stage should have the "
f"same length as num_blocks={num_blocks}."
)
newk = k[: -len("_per_block")]
assert newk not in kwargs, f"Cannot call make_stage with both {k} and {newk}!"
curr_kwargs[newk] = v[i]
else:
curr_kwargs[k] = v
blocks.append(
block_class(in_channels=in_channels, out_channels=out_channels, **curr_kwargs)
)
in_channels = out_channels
return blocks
@staticmethod
def make_default_stages(depth, block_class=None, **kwargs):
"""
Created list of ResNet stages from pre-defined depth (one of 18, 34, 50, 101, 152).
If it doesn't create the ResNet variant you need, please use :meth:`make_stage`
instead for fine-grained customization.
Args:
depth (int): depth of ResNet
block_class (type): the CNN block class. Has to accept
`bottleneck_channels` argument for depth > 50.
By default it is BasicBlock or BottleneckBlock, based on the
depth.
kwargs:
other arguments to pass to `make_stage`. Should not contain
stride and channels, as they are predefined for each depth.
Returns:
list[list[CNNBlockBase]]: modules in all stages; see arguments of
:class:`ResNet.__init__`.
"""
num_blocks_per_stage = {
18: [2, 2, 2, 2],
34: [3, 4, 6, 3],
50: [3, 4, 6, 3],
101: [3, 4, 23, 3],
152: [3, 8, 36, 3],
}[depth]
if block_class is None:
block_class = BasicBlock if depth < 50 else BottleneckBlock
if depth < 50:
in_channels = [64, 64, 128, 256]
out_channels = [64, 128, 256, 512]
else:
in_channels = [64, 256, 512, 1024]
out_channels = [256, 512, 1024, 2048]
ret = []
for (n, s, i, o) in zip(num_blocks_per_stage, [1, 2, 2, 2], in_channels, out_channels):
if depth >= 50:
kwargs["bottleneck_channels"] = o // 4
ret.append(
ResNet.make_stage(
block_class=block_class,
num_blocks=n,
stride_per_block=[s] + [1] * (n - 1),
in_channels=i,
out_channels=o,
**kwargs,
)
)
return ret
ResNetBlockBase = CNNBlockBase
"""
Alias for backward compatibiltiy.
"""
def make_stage(*args, **kwargs):
"""
Deprecated alias for backward compatibiltiy.
"""
return ResNet.make_stage(*args, **kwargs)
@BACKBONE_REGISTRY.register()
def build_resnet_backbone(cfg, input_shape):
"""
Create a ResNet instance from config.
Returns:
ResNet: a :class:`ResNet` instance.
"""
# need registration of new blocks/stems?
norm = cfg.MODEL.RESNETS.NORM
stem = BasicStem(
in_channels=input_shape.channels,
out_channels=cfg.MODEL.RESNETS.STEM_OUT_CHANNELS,
norm=norm,
)
# fmt: off
freeze_at = cfg.MODEL.BACKBONE.FREEZE_AT
out_features = cfg.MODEL.RESNETS.OUT_FEATURES
depth = cfg.MODEL.RESNETS.DEPTH
num_groups = cfg.MODEL.RESNETS.NUM_GROUPS
width_per_group = cfg.MODEL.RESNETS.WIDTH_PER_GROUP
bottleneck_channels = num_groups * width_per_group
in_channels = cfg.MODEL.RESNETS.STEM_OUT_CHANNELS
out_channels = cfg.MODEL.RESNETS.RES2_OUT_CHANNELS
stride_in_1x1 = cfg.MODEL.RESNETS.STRIDE_IN_1X1
res5_dilation = cfg.MODEL.RESNETS.RES5_DILATION
deform_on_per_stage = cfg.MODEL.RESNETS.DEFORM_ON_PER_STAGE
deform_modulated = cfg.MODEL.RESNETS.DEFORM_MODULATED
deform_num_groups = cfg.MODEL.RESNETS.DEFORM_NUM_GROUPS
# fmt: on
assert res5_dilation in {1, 2}, "res5_dilation cannot be {}.".format(res5_dilation)
num_blocks_per_stage = {
18: [2, 2, 2, 2],
34: [3, 4, 6, 3],
50: [3, 4, 6, 3],
101: [3, 4, 23, 3],
152: [3, 8, 36, 3],
}[depth]
if depth in [18, 34]:
assert out_channels == 64, "Must set MODEL.RESNETS.RES2_OUT_CHANNELS = 64 for R18/R34"
assert not any(
deform_on_per_stage
), "MODEL.RESNETS.DEFORM_ON_PER_STAGE unsupported for R18/R34"
assert res5_dilation == 1, "Must set MODEL.RESNETS.RES5_DILATION = 1 for R18/R34"
assert num_groups == 1, "Must set MODEL.RESNETS.NUM_GROUPS = 1 for R18/R34"
stages = []
for idx, stage_idx in enumerate(range(2, 6)):
# res5_dilation is used this way as a convention in R-FCN & Deformable Conv paper
dilation = res5_dilation if stage_idx == 5 else 1
first_stride = 1 if idx == 0 or (stage_idx == 5 and dilation == 2) else 2
stage_kargs = {
"num_blocks": num_blocks_per_stage[idx],
"stride_per_block": [first_stride] + [1] * (num_blocks_per_stage[idx] - 1),
"in_channels": in_channels,
"out_channels": out_channels,
"norm": norm,
}
# Use BasicBlock for R18 and R34.
if depth in [18, 34]:
stage_kargs["block_class"] = BasicBlock
else:
stage_kargs["bottleneck_channels"] = bottleneck_channels
stage_kargs["stride_in_1x1"] = stride_in_1x1
stage_kargs["dilation"] = dilation
stage_kargs["num_groups"] = num_groups
if deform_on_per_stage[idx]:
stage_kargs["block_class"] = DeformBottleneckBlock
stage_kargs["deform_modulated"] = deform_modulated
stage_kargs["deform_num_groups"] = deform_num_groups
else:
stage_kargs["block_class"] = BottleneckBlock
blocks = ResNet.make_stage(**stage_kargs)
in_channels = out_channels
out_channels *= 2
bottleneck_channels *= 2
stages.append(blocks)
return ResNet(stem, stages, out_features=out_features, freeze_at=freeze_at)
# Copyright (c) Facebook, Inc. and its affiliates.
import math
from typing import List, Tuple
import torch
from fvcore.nn import giou_loss, smooth_l1_loss
from torch.nn import functional as F
from detectron2.layers import cat, ciou_loss, diou_loss
from detectron2.structures import Boxes
# Value for clamping large dw and dh predictions. The heuristic is that we clamp
# such that dw and dh are no larger than what would transform a 16px box into a
# 1000px box (based on a small anchor, 16px, and a typical image size, 1000px).
_DEFAULT_SCALE_CLAMP = math.log(1000.0 / 16)
__all__ = ["Box2BoxTransform", "Box2BoxTransformRotated", "Box2BoxTransformLinear"]
@torch.jit.script
class Box2BoxTransform(object):
"""
The box-to-box transform defined in R-CNN. The transformation is parameterized
by 4 deltas: (dx, dy, dw, dh). The transformation scales the box's width and height
by exp(dw), exp(dh) and shifts a box's center by the offset (dx * width, dy * height).
"""
def __init__(
self, weights: Tuple[float, float, float, float], scale_clamp: float = _DEFAULT_SCALE_CLAMP
):
"""
Args:
weights (4-element tuple): Scaling factors that are applied to the
(dx, dy, dw, dh) deltas. In Fast R-CNN, these were originally set
such that the deltas have unit variance; now they are treated as
hyperparameters of the system.
scale_clamp (float): When predicting deltas, the predicted box scaling
factors (dw and dh) are clamped such that they are <= scale_clamp.
"""
self.weights = weights
self.scale_clamp = scale_clamp
def get_deltas(self, src_boxes, target_boxes):
"""
Get box regression transformation deltas (dx, dy, dw, dh) that can be used
to transform the `src_boxes` into the `target_boxes`. That is, the relation
``target_boxes == self.apply_deltas(deltas, src_boxes)`` is true (unless
any delta is too large and is clamped).
Args:
src_boxes (Tensor): source boxes, e.g., object proposals
target_boxes (Tensor): target of the transformation, e.g., ground-truth
boxes.
"""
assert isinstance(src_boxes, torch.Tensor), type(src_boxes)
assert isinstance(target_boxes, torch.Tensor), type(target_boxes)
src_widths = src_boxes[:, 2] - src_boxes[:, 0]
src_heights = src_boxes[:, 3] - src_boxes[:, 1]
src_ctr_x = src_boxes[:, 0] + 0.5 * src_widths
src_ctr_y = src_boxes[:, 1] + 0.5 * src_heights
target_widths = target_boxes[:, 2] - target_boxes[:, 0]
target_heights = target_boxes[:, 3] - target_boxes[:, 1]
target_ctr_x = target_boxes[:, 0] + 0.5 * target_widths
target_ctr_y = target_boxes[:, 1] + 0.5 * target_heights
wx, wy, ww, wh = self.weights
dx = wx * (target_ctr_x - src_ctr_x) / src_widths
dy = wy * (target_ctr_y - src_ctr_y) / src_heights
dw = ww * torch.log(target_widths / src_widths)
dh = wh * torch.log(target_heights / src_heights)
deltas = torch.stack((dx, dy, dw, dh), dim=1)
assert (src_widths > 0).all().item(), "Input boxes to Box2BoxTransform are not valid!"
return deltas
def apply_deltas(self, deltas, boxes):
"""
Apply transformation `deltas` (dx, dy, dw, dh) to `boxes`.
Args:
deltas (Tensor): transformation deltas of shape (N, k*4), where k >= 1.
deltas[i] represents k potentially different class-specific
box transformations for the single box boxes[i].
boxes (Tensor): boxes to transform, of shape (N, 4)
"""
deltas = deltas.float() # ensure fp32 for decoding precision
boxes = boxes.to(deltas.dtype)
widths = boxes[:, 2] - boxes[:, 0]
heights = boxes[:, 3] - boxes[:, 1]
ctr_x = boxes[:, 0] + 0.5 * widths
ctr_y = boxes[:, 1] + 0.5 * heights
wx, wy, ww, wh = self.weights
dx = deltas[:, 0::4] / wx
dy = deltas[:, 1::4] / wy
dw = deltas[:, 2::4] / ww
dh = deltas[:, 3::4] / wh
# Prevent sending too large values into torch.exp()
dw = torch.clamp(dw, max=self.scale_clamp)
dh = torch.clamp(dh, max=self.scale_clamp)
pred_ctr_x = dx * widths[:, None] + ctr_x[:, None]
pred_ctr_y = dy * heights[:, None] + ctr_y[:, None]
pred_w = torch.exp(dw) * widths[:, None]
pred_h = torch.exp(dh) * heights[:, None]
x1 = pred_ctr_x - 0.5 * pred_w
y1 = pred_ctr_y - 0.5 * pred_h
x2 = pred_ctr_x + 0.5 * pred_w
y2 = pred_ctr_y + 0.5 * pred_h
pred_boxes = torch.stack((x1, y1, x2, y2), dim=-1)
return pred_boxes.reshape(deltas.shape)
@torch.jit.script
class Box2BoxTransformRotated(object):
"""
The box-to-box transform defined in Rotated R-CNN. The transformation is parameterized
by 5 deltas: (dx, dy, dw, dh, da). The transformation scales the box's width and height
by exp(dw), exp(dh), shifts a box's center by the offset (dx * width, dy * height),
and rotate a box's angle by da (radians).
Note: angles of deltas are in radians while angles of boxes are in degrees.
"""
def __init__(
self,
weights: Tuple[float, float, float, float, float],
scale_clamp: float = _DEFAULT_SCALE_CLAMP,
):
"""
Args:
weights (5-element tuple): Scaling factors that are applied to the
(dx, dy, dw, dh, da) deltas. These are treated as
hyperparameters of the system.
scale_clamp (float): When predicting deltas, the predicted box scaling
factors (dw and dh) are clamped such that they are <= scale_clamp.
"""
self.weights = weights
self.scale_clamp = scale_clamp
def get_deltas(self, src_boxes, target_boxes):
"""
Get box regression transformation deltas (dx, dy, dw, dh, da) that can be used
to transform the `src_boxes` into the `target_boxes`. That is, the relation
``target_boxes == self.apply_deltas(deltas, src_boxes)`` is true (unless
any delta is too large and is clamped).
Args:
src_boxes (Tensor): Nx5 source boxes, e.g., object proposals
target_boxes (Tensor): Nx5 target of the transformation, e.g., ground-truth
boxes.
"""
assert isinstance(src_boxes, torch.Tensor), type(src_boxes)
assert isinstance(target_boxes, torch.Tensor), type(target_boxes)
src_ctr_x, src_ctr_y, src_widths, src_heights, src_angles = torch.unbind(src_boxes, dim=1)
target_ctr_x, target_ctr_y, target_widths, target_heights, target_angles = torch.unbind(
target_boxes, dim=1
)
wx, wy, ww, wh, wa = self.weights
dx = wx * (target_ctr_x - src_ctr_x) / src_widths
dy = wy * (target_ctr_y - src_ctr_y) / src_heights
dw = ww * torch.log(target_widths / src_widths)
dh = wh * torch.log(target_heights / src_heights)
# Angles of deltas are in radians while angles of boxes are in degrees.
# the conversion to radians serve as a way to normalize the values
da = target_angles - src_angles
da = (da + 180.0) % 360.0 - 180.0 # make it in [-180, 180)
da *= wa * math.pi / 180.0
deltas = torch.stack((dx, dy, dw, dh, da), dim=1)
assert (
(src_widths > 0).all().item()
), "Input boxes to Box2BoxTransformRotated are not valid!"
return deltas
def apply_deltas(self, deltas, boxes):
"""
Apply transformation `deltas` (dx, dy, dw, dh, da) to `boxes`.
Args:
deltas (Tensor): transformation deltas of shape (N, k*5).
deltas[i] represents box transformation for the single box boxes[i].
boxes (Tensor): boxes to transform, of shape (N, 5)
"""
assert deltas.shape[1] % 5 == 0 and boxes.shape[1] == 5
boxes = boxes.to(deltas.dtype).unsqueeze(2)
ctr_x = boxes[:, 0]
ctr_y = boxes[:, 1]
widths = boxes[:, 2]
heights = boxes[:, 3]
angles = boxes[:, 4]
wx, wy, ww, wh, wa = self.weights
dx = deltas[:, 0::5] / wx
dy = deltas[:, 1::5] / wy
dw = deltas[:, 2::5] / ww
dh = deltas[:, 3::5] / wh
da = deltas[:, 4::5] / wa
# Prevent sending too large values into torch.exp()
dw = torch.clamp(dw, max=self.scale_clamp)
dh = torch.clamp(dh, max=self.scale_clamp)
pred_boxes = torch.zeros_like(deltas)
pred_boxes[:, 0::5] = dx * widths + ctr_x # x_ctr
pred_boxes[:, 1::5] = dy * heights + ctr_y # y_ctr
pred_boxes[:, 2::5] = torch.exp(dw) * widths # width
pred_boxes[:, 3::5] = torch.exp(dh) * heights # height
# Following original RRPN implementation,
# angles of deltas are in radians while angles of boxes are in degrees.
pred_angle = da * 180.0 / math.pi + angles
pred_angle = (pred_angle + 180.0) % 360.0 - 180.0 # make it in [-180, 180)
pred_boxes[:, 4::5] = pred_angle
return pred_boxes
class Box2BoxTransformLinear:
"""
The linear box-to-box transform defined in FCOS. The transformation is parameterized
by the distance from the center of (square) src box to 4 edges of the target box.
"""
def __init__(self, normalize_by_size=True):
"""
Args:
normalize_by_size: normalize deltas by the size of src (anchor) boxes.
"""
self.normalize_by_size = normalize_by_size
def get_deltas(self, src_boxes, target_boxes):
"""
Get box regression transformation deltas (dx1, dy1, dx2, dy2) that can be used
to transform the `src_boxes` into the `target_boxes`. That is, the relation
``target_boxes == self.apply_deltas(deltas, src_boxes)`` is true.
The center of src must be inside target boxes.
Args:
src_boxes (Tensor): square source boxes, e.g., anchors
target_boxes (Tensor): target of the transformation, e.g., ground-truth
boxes.
"""
assert isinstance(src_boxes, torch.Tensor), type(src_boxes)
assert isinstance(target_boxes, torch.Tensor), type(target_boxes)
src_ctr_x = 0.5 * (src_boxes[:, 0] + src_boxes[:, 2])
src_ctr_y = 0.5 * (src_boxes[:, 1] + src_boxes[:, 3])
target_l = src_ctr_x - target_boxes[:, 0]
target_t = src_ctr_y - target_boxes[:, 1]
target_r = target_boxes[:, 2] - src_ctr_x
target_b = target_boxes[:, 3] - src_ctr_y
deltas = torch.stack((target_l, target_t, target_r, target_b), dim=1)
if self.normalize_by_size:
stride = (src_boxes[:, 2] - src_boxes[:, 0]).unsqueeze(1)
deltas = deltas / stride
return deltas
def apply_deltas(self, deltas, boxes):
"""
Apply transformation `deltas` (dx1, dy1, dx2, dy2) to `boxes`.
Args:
deltas (Tensor): transformation deltas of shape (N, k*4), where k >= 1.
deltas[i] represents k potentially different class-specific
box transformations for the single box boxes[i].
boxes (Tensor): boxes to transform, of shape (N, 4)
"""
# Ensure the output is a valid box. See Sec 2.1 of https://arxiv.org/abs/2006.09214
deltas = F.relu(deltas)
boxes = boxes.to(deltas.dtype)
ctr_x = 0.5 * (boxes[:, 0] + boxes[:, 2])
ctr_y = 0.5 * (boxes[:, 1] + boxes[:, 3])
if self.normalize_by_size:
stride = (boxes[:, 2] - boxes[:, 0]).unsqueeze(1)
deltas = deltas * stride
l = deltas[:, 0::4]
t = deltas[:, 1::4]
r = deltas[:, 2::4]
b = deltas[:, 3::4]
pred_boxes = torch.zeros_like(deltas)
pred_boxes[:, 0::4] = ctr_x[:, None] - l # x1
pred_boxes[:, 1::4] = ctr_y[:, None] - t # y1
pred_boxes[:, 2::4] = ctr_x[:, None] + r # x2
pred_boxes[:, 3::4] = ctr_y[:, None] + b # y2
return pred_boxes
def _dense_box_regression_loss(
anchors: List[Boxes],
box2box_transform: Box2BoxTransform,
pred_anchor_deltas: List[torch.Tensor],
gt_boxes: List[torch.Tensor],
fg_mask: torch.Tensor,
box_reg_loss_type="smooth_l1",
smooth_l1_beta=0.0,
):
"""
Compute loss for dense multi-level box regression.
Loss is accumulated over ``fg_mask``.
Args:
anchors: #lvl anchor boxes, each is (HixWixA, 4)
pred_anchor_deltas: #lvl predictions, each is (N, HixWixA, 4)
gt_boxes: N ground truth boxes, each has shape (R, 4) (R = sum(Hi * Wi * A))
fg_mask: the foreground boolean mask of shape (N, R) to compute loss on
box_reg_loss_type (str): Loss type to use. Supported losses: "smooth_l1", "giou",
"diou", "ciou".
smooth_l1_beta (float): beta parameter for the smooth L1 regression loss. Default to
use L1 loss. Only used when `box_reg_loss_type` is "smooth_l1"
"""
anchors = type(anchors[0]).cat(anchors).tensor # (R, 4)
if box_reg_loss_type == "smooth_l1":
gt_anchor_deltas = [box2box_transform.get_deltas(anchors, k) for k in gt_boxes]
gt_anchor_deltas = torch.stack(gt_anchor_deltas) # (N, R, 4)
loss_box_reg = smooth_l1_loss(
cat(pred_anchor_deltas, dim=1)[fg_mask],
gt_anchor_deltas[fg_mask],
beta=smooth_l1_beta,
reduction="sum",
)
elif box_reg_loss_type == "giou":
pred_boxes = [
box2box_transform.apply_deltas(k, anchors) for k in cat(pred_anchor_deltas, dim=1)
]
loss_box_reg = giou_loss(
torch.stack(pred_boxes)[fg_mask], torch.stack(gt_boxes)[fg_mask], reduction="sum"
)
elif box_reg_loss_type == "diou":
pred_boxes = [
box2box_transform.apply_deltas(k, anchors) for k in cat(pred_anchor_deltas, dim=1)
]
loss_box_reg = diou_loss(
torch.stack(pred_boxes)[fg_mask], torch.stack(gt_boxes)[fg_mask], reduction="sum"
)
elif box_reg_loss_type == "ciou":
pred_boxes = [
box2box_transform.apply_deltas(k, anchors) for k in cat(pred_anchor_deltas, dim=1)
]
loss_box_reg = ciou_loss(
torch.stack(pred_boxes)[fg_mask], torch.stack(gt_boxes)[fg_mask], reduction="sum"
)
else:
raise ValueError(f"Invalid dense box regression loss type '{box_reg_loss_type}'")
return loss_box_reg
# Copyright (c) Facebook, Inc. and its affiliates.
from typing import List
import torch
from detectron2.layers import nonzero_tuple
# TODO: the name is too general
class Matcher(object):
"""
This class assigns to each predicted "element" (e.g., a box) a ground-truth
element. Each predicted element will have exactly zero or one matches; each
ground-truth element may be matched to zero or more predicted elements.
The matching is determined by the MxN match_quality_matrix, that characterizes
how well each (ground-truth, prediction)-pair match each other. For example,
if the elements are boxes, this matrix may contain box intersection-over-union
overlap values.
The matcher returns (a) a vector of length N containing the index of the
ground-truth element m in [0, M) that matches to prediction n in [0, N).
(b) a vector of length N containing the labels for each prediction.
"""
def __init__(
self, thresholds: List[float], labels: List[int], allow_low_quality_matches: bool = False
):
"""
Args:
thresholds (list): a list of thresholds used to stratify predictions
into levels.
labels (list): a list of values to label predictions belonging at
each level. A label can be one of {-1, 0, 1} signifying
{ignore, negative class, positive class}, respectively.
allow_low_quality_matches (bool): if True, produce additional matches
for predictions with maximum match quality lower than high_threshold.
See set_low_quality_matches_ for more details.
For example,
thresholds = [0.3, 0.5]
labels = [0, -1, 1]
All predictions with iou < 0.3 will be marked with 0 and
thus will be considered as false positives while training.
All predictions with 0.3 <= iou < 0.5 will be marked with -1 and
thus will be ignored.
All predictions with 0.5 <= iou will be marked with 1 and
thus will be considered as true positives.
"""
# Add -inf and +inf to first and last position in thresholds
thresholds = thresholds[:]
assert thresholds[0] > 0
thresholds.insert(0, -float("inf"))
thresholds.append(float("inf"))
# Currently torchscript does not support all + generator
assert all([low <= high for (low, high) in zip(thresholds[:-1], thresholds[1:])])
assert all([l in [-1, 0, 1] for l in labels])
assert len(labels) == len(thresholds) - 1
self.thresholds = thresholds
self.labels = labels
self.allow_low_quality_matches = allow_low_quality_matches
def __call__(self, match_quality_matrix):
"""
Args:
match_quality_matrix (Tensor[float]): an MxN tensor, containing the
pairwise quality between M ground-truth elements and N predicted
elements. All elements must be >= 0 (due to the us of `torch.nonzero`
for selecting indices in :meth:`set_low_quality_matches_`).
Returns:
matches (Tensor[int64]): a vector of length N, where matches[i] is a matched
ground-truth index in [0, M)
match_labels (Tensor[int8]): a vector of length N, where pred_labels[i] indicates
whether a prediction is a true or false positive or ignored
"""
assert match_quality_matrix.dim() == 2
if match_quality_matrix.numel() == 0:
default_matches = match_quality_matrix.new_full(
(match_quality_matrix.size(1),), 0, dtype=torch.int64
)
# When no gt boxes exist, we define IOU = 0 and therefore set labels
# to `self.labels[0]`, which usually defaults to background class 0
# To choose to ignore instead, can make labels=[-1,0,-1,1] + set appropriate thresholds
default_match_labels = match_quality_matrix.new_full(
(match_quality_matrix.size(1),), self.labels[0], dtype=torch.int8
)
return default_matches, default_match_labels
assert torch.all(match_quality_matrix >= 0)
# match_quality_matrix is M (gt) x N (predicted)
# Max over gt elements (dim 0) to find best gt candidate for each prediction
matched_vals, matches = match_quality_matrix.max(dim=0)
match_labels = matches.new_full(matches.size(), 1, dtype=torch.int8)
for (l, low, high) in zip(self.labels, self.thresholds[:-1], self.thresholds[1:]):
low_high = (matched_vals >= low) & (matched_vals < high)
match_labels[low_high] = l
if self.allow_low_quality_matches:
self.set_low_quality_matches_(match_labels, match_quality_matrix)
return matches, match_labels
def set_low_quality_matches_(self, match_labels, match_quality_matrix):
"""
Produce additional matches for predictions that have only low-quality matches.
Specifically, for each ground-truth G find the set of predictions that have
maximum overlap with it (including ties); for each prediction in that set, if
it is unmatched, then match it to the ground-truth G.
This function implements the RPN assignment case (i) in Sec. 3.1.2 of
:paper:`Faster R-CNN`.
"""
# For each gt, find the prediction with which it has highest quality
highest_quality_foreach_gt, _ = match_quality_matrix.max(dim=1)
# Find the highest quality match available, even if it is low, including ties.
# Note that the matches qualities must be positive due to the use of
# `torch.nonzero`.
_, pred_inds_with_highest_quality = nonzero_tuple(
match_quality_matrix == highest_quality_foreach_gt[:, None]
)
# If an anchor was labeled positive only due to a low-quality match
# with gt_A, but it has larger overlap with gt_B, it's matched index will still be gt_B.
# This follows the implementation in Detectron, and is found to have no significant impact.
match_labels[pred_inds_with_highest_quality] = 1
# -*- coding: utf-8 -*-
# Copyright (c) Facebook, Inc. and its affiliates.
from .build import META_ARCH_REGISTRY, build_model # isort:skip
from .panoptic_fpn import PanopticFPN
# import all the meta_arch, so they will be registered
from .rcnn import GeneralizedRCNN, ProposalNetwork
from .dense_detector import DenseDetector
from .retinanet import RetinaNet
from .semantic_seg import SEM_SEG_HEADS_REGISTRY, SemanticSegmentor, build_sem_seg_head
__all__ = list(globals().keys())
# Copyright (c) Facebook, Inc. and its affiliates.
import torch
from detectron2.utils.logger import _log_api_usage
from detectron2.utils.registry import Registry
META_ARCH_REGISTRY = Registry("META_ARCH") # noqa F401 isort:skip
META_ARCH_REGISTRY.__doc__ = """
Registry for meta-architectures, i.e. the whole model.
The registered object will be called with `obj(cfg)`
and expected to return a `nn.Module` object.
"""
def build_model(cfg):
"""
Build the whole model architecture, defined by ``cfg.MODEL.META_ARCHITECTURE``.
Note that it does not load any weights from ``cfg``.
"""
meta_arch = cfg.MODEL.META_ARCHITECTURE
model = META_ARCH_REGISTRY.get(meta_arch)(cfg)
model.to(torch.device(cfg.MODEL.DEVICE))
_log_api_usage("modeling.meta_arch." + meta_arch)
return model
import numpy as np
from typing import Dict, List, Optional, Tuple
import torch
from torch import Tensor, nn
from detectron2.data.detection_utils import convert_image_to_rgb
from detectron2.modeling import Backbone
from detectron2.structures import Boxes, ImageList, Instances
from detectron2.utils.events import get_event_storage
from ..postprocessing import detector_postprocess
def permute_to_N_HWA_K(tensor, K: int):
"""
Transpose/reshape a tensor from (N, (Ai x K), H, W) to (N, (HxWxAi), K)
"""
assert tensor.dim() == 4, tensor.shape
N, _, H, W = tensor.shape
tensor = tensor.view(N, -1, K, H, W)
tensor = tensor.permute(0, 3, 4, 1, 2)
tensor = tensor.reshape(N, -1, K) # Size=(N,HWA,K)
return tensor
class DenseDetector(nn.Module):
"""
Base class for dense detector. We define a dense detector as a fully-convolutional model that
makes per-pixel (i.e. dense) predictions.
"""
def __init__(
self,
backbone: Backbone,
head: nn.Module,
head_in_features: Optional[List[str]] = None,
*,
pixel_mean,
pixel_std,
):
"""
Args:
backbone: backbone module
head: head module
head_in_features: backbone features to use in head. Default to all backbone features.
pixel_mean (Tuple[float]):
Values to be used for image normalization (BGR order).
To train on images of different number of channels, set different mean & std.
Default values are the mean pixel value from ImageNet: [103.53, 116.28, 123.675]
pixel_std (Tuple[float]):
When using pre-trained models in Detectron1 or any MSRA models,
std has been absorbed into its conv1 weights, so the std needs to be set 1.
Otherwise, you can use [57.375, 57.120, 58.395] (ImageNet std)
"""
super().__init__()
self.backbone = backbone
self.head = head
if head_in_features is None:
shapes = self.backbone.output_shape()
self.head_in_features = sorted(shapes.keys(), key=lambda x: shapes[x].stride)
else:
self.head_in_features = head_in_features
self.register_buffer("pixel_mean", torch.tensor(pixel_mean).view(-1, 1, 1), False)
self.register_buffer("pixel_std", torch.tensor(pixel_std).view(-1, 1, 1), False)
@property
def device(self):
return self.pixel_mean.device
def forward(self, batched_inputs: List[Dict[str, Tensor]]):
"""
Args:
batched_inputs: a list, batched outputs of :class:`DatasetMapper` .
Each item in the list contains the inputs for one image.
For now, each item in the list is a dict that contains:
* image: Tensor, image in (C, H, W) format.
* instances: Instances
Other information that's included in the original dicts, such as:
* "height", "width" (int): the output resolution of the model, used in inference.
See :meth:`postprocess` for details.
Returns:
In training, dict[str, Tensor]: mapping from a named loss to a tensor storing the
loss. Used during training only. In inference, the standard output format, described
in :doc:`/tutorials/models`.
"""
images = self.preprocess_image(batched_inputs)
features = self.backbone(images.tensor)
features = [features[f] for f in self.head_in_features]
predictions = self.head(features)
if self.training:
assert not torch.jit.is_scripting(), "Not supported"
assert "instances" in batched_inputs[0], "Instance annotations are missing in training!"
gt_instances = [x["instances"].to(self.device) for x in batched_inputs]
return self.forward_training(images, features, predictions, gt_instances)
else:
results = self.forward_inference(images, features, predictions)
if torch.jit.is_scripting():
return results
processed_results = []
for results_per_image, input_per_image, image_size in zip(
results, batched_inputs, images.image_sizes
):
height = input_per_image.get("height", image_size[0])
width = input_per_image.get("width", image_size[1])
r = detector_postprocess(results_per_image, height, width)
processed_results.append({"instances": r})
return processed_results
def forward_training(self, images, features, predictions, gt_instances):
raise NotImplementedError()
def preprocess_image(self, batched_inputs: List[Dict[str, Tensor]]):
"""
Normalize, pad and batch the input images.
"""
images = [x["image"].to(self.device) for x in batched_inputs]
images = [(x - self.pixel_mean) / self.pixel_std for x in images]
images = ImageList.from_tensors(images, self.backbone.size_divisibility)
return images
def _transpose_dense_predictions(
self, predictions: List[List[Tensor]], dims_per_anchor: List[int]
) -> List[List[Tensor]]:
"""
Transpose the dense per-level predictions.
Args:
predictions: a list of outputs, each is a list of per-level
predictions with shape (N, Ai x K, Hi, Wi), where N is the
number of images, Ai is the number of anchors per location on
level i, K is the dimension of predictions per anchor.
dims_per_anchor: the value of K for each predictions. e.g. 4 for
box prediction, #classes for classification prediction.
Returns:
List[List[Tensor]]: each prediction is transposed to (N, Hi x Wi x Ai, K).
"""
assert len(predictions) == len(dims_per_anchor)
res: List[List[Tensor]] = []
for pred, dim_per_anchor in zip(predictions, dims_per_anchor):
pred = [permute_to_N_HWA_K(x, dim_per_anchor) for x in pred]
res.append(pred)
return res
def _ema_update(self, name: str, value: float, initial_value: float, momentum: float = 0.9):
"""
Apply EMA update to `self.name` using `value`.
This is mainly used for loss normalizer. In Detectron1, loss is normalized by number
of foreground samples in the batch. When batch size is 1 per GPU, #foreground has a
large variance and using it lead to lower performance. Therefore we maintain an EMA of
#foreground to stabilize the normalizer.
Args:
name: name of the normalizer
value: the new value to update
initial_value: the initial value to start with
momentum: momentum of EMA
Returns:
float: the updated EMA value
"""
if hasattr(self, name):
old = getattr(self, name)
else:
old = initial_value
new = old * momentum + value * (1 - momentum)
setattr(self, name, new)
return new
def _decode_per_level_predictions(
self,
anchors: Boxes,
pred_scores: Tensor,
pred_deltas: Tensor,
score_thresh: float,
topk_candidates: int,
image_size: Tuple[int, int],
) -> Instances:
"""
Decode boxes and classification predictions of one featuer level, by
the following steps:
1. filter the predictions based on score threshold and top K scores.
2. transform the box regression outputs
3. return the predicted scores, classes and boxes
Args:
anchors: Boxes, anchor for this feature level
pred_scores: HxWxA,K
pred_deltas: HxWxA,4
Returns:
Instances: with field "scores", "pred_boxes", "pred_classes".
"""
# Apply two filtering to make NMS faster.
# 1. Keep boxes with confidence score higher than threshold
keep_idxs = pred_scores > score_thresh
pred_scores = pred_scores[keep_idxs]
topk_idxs = torch.nonzero(keep_idxs) # Kx2
# 2. Keep top k top scoring boxes only
num_topk = min(topk_candidates, topk_idxs.size(0))
# torch.sort is actually faster than .topk (https://github.com/pytorch/pytorch/issues/22812)
pred_scores, idxs = pred_scores.sort(descending=True)
pred_scores = pred_scores[:num_topk]
topk_idxs = topk_idxs[idxs[:num_topk]]
anchor_idxs, classes_idxs = topk_idxs.unbind(dim=1)
pred_boxes = self.box2box_transform.apply_deltas(
pred_deltas[anchor_idxs], anchors.tensor[anchor_idxs]
)
return Instances(
image_size, pred_boxes=Boxes(pred_boxes), scores=pred_scores, pred_classes=classes_idxs
)
def _decode_multi_level_predictions(
self,
anchors: List[Boxes],
pred_scores: List[Tensor],
pred_deltas: List[Tensor],
score_thresh: float,
topk_candidates: int,
image_size: Tuple[int, int],
) -> Instances:
"""
Run `_decode_per_level_predictions` for all feature levels and concat the results.
"""
predictions = [
self._decode_per_level_predictions(
anchors_i,
box_cls_i,
box_reg_i,
self.test_score_thresh,
self.test_topk_candidates,
image_size,
)
# Iterate over every feature level
for box_cls_i, box_reg_i, anchors_i in zip(pred_scores, pred_deltas, anchors)
]
return predictions[0].cat(predictions) # 'Instances.cat' is not scriptale but this is
def visualize_training(self, batched_inputs, results):
"""
A function used to visualize ground truth images and final network predictions.
It shows ground truth bounding boxes on the original image and up to 20
predicted object bounding boxes on the original image.
Args:
batched_inputs (list): a list that contains input to the model.
results (List[Instances]): a list of #images elements returned by forward_inference().
"""
from detectron2.utils.visualizer import Visualizer
assert len(batched_inputs) == len(
results
), "Cannot visualize inputs and results of different sizes"
storage = get_event_storage()
max_boxes = 20
image_index = 0 # only visualize a single image
img = batched_inputs[image_index]["image"]
img = convert_image_to_rgb(img.permute(1, 2, 0), self.input_format)
v_gt = Visualizer(img, None)
v_gt = v_gt.overlay_instances(boxes=batched_inputs[image_index]["instances"].gt_boxes)
anno_img = v_gt.get_image()
processed_results = detector_postprocess(results[image_index], img.shape[0], img.shape[1])
predicted_boxes = processed_results.pred_boxes.tensor.detach().cpu().numpy()
v_pred = Visualizer(img, None)
v_pred = v_pred.overlay_instances(boxes=predicted_boxes[0:max_boxes])
prop_img = v_pred.get_image()
vis_img = np.vstack((anno_img, prop_img))
vis_img = vis_img.transpose(2, 0, 1)
vis_name = f"Top: GT bounding boxes; Bottom: {max_boxes} Highest Scoring Results"
storage.put_image(vis_name, vis_img)
# Copyright (c) Facebook, Inc. and its affiliates.
import logging
from typing import List, Optional, Tuple
import torch
from fvcore.nn import sigmoid_focal_loss_jit
from torch import Tensor, nn
from torch.nn import functional as F
from detectron2.layers import ShapeSpec, batched_nms
from detectron2.structures import Boxes, ImageList, Instances, pairwise_point_box_distance
from detectron2.utils.events import get_event_storage
from ..anchor_generator import DefaultAnchorGenerator
from ..backbone import Backbone
from ..box_regression import Box2BoxTransformLinear, _dense_box_regression_loss
from .dense_detector import DenseDetector
from .retinanet import RetinaNetHead
__all__ = ["FCOS"]
logger = logging.getLogger(__name__)
class FCOS(DenseDetector):
"""
Implement FCOS in :paper:`fcos`.
"""
def __init__(
self,
*,
backbone: Backbone,
head: nn.Module,
head_in_features: Optional[List[str]] = None,
box2box_transform=None,
num_classes,
center_sampling_radius: float = 1.5,
focal_loss_alpha=0.25,
focal_loss_gamma=2.0,
test_score_thresh=0.2,
test_topk_candidates=1000,
test_nms_thresh=0.6,
max_detections_per_image=100,
pixel_mean,
pixel_std,
):
"""
Args:
center_sampling_radius: radius of the "center" of a groundtruth box,
within which all anchor points are labeled positive.
Other arguments mean the same as in :class:`RetinaNet`.
"""
super().__init__(
backbone, head, head_in_features, pixel_mean=pixel_mean, pixel_std=pixel_std
)
self.num_classes = num_classes
# FCOS uses one anchor point per location.
# We represent the anchor point by a box whose size equals the anchor stride.
feature_shapes = backbone.output_shape()
fpn_strides = [feature_shapes[k].stride for k in self.head_in_features]
self.anchor_generator = DefaultAnchorGenerator(
sizes=[[k] for k in fpn_strides], aspect_ratios=[1.0], strides=fpn_strides
)
# FCOS parameterizes box regression by a linear transform,
# where predictions are normalized by anchor stride (equal to anchor size).
if box2box_transform is None:
box2box_transform = Box2BoxTransformLinear(normalize_by_size=True)
self.box2box_transform = box2box_transform
self.center_sampling_radius = float(center_sampling_radius)
# Loss parameters:
self.focal_loss_alpha = focal_loss_alpha
self.focal_loss_gamma = focal_loss_gamma
# Inference parameters:
self.test_score_thresh = test_score_thresh
self.test_topk_candidates = test_topk_candidates
self.test_nms_thresh = test_nms_thresh
self.max_detections_per_image = max_detections_per_image
def forward_training(self, images, features, predictions, gt_instances):
# Transpose the Hi*Wi*A dimension to the middle:
pred_logits, pred_anchor_deltas, pred_centerness = self._transpose_dense_predictions(
predictions, [self.num_classes, 4, 1]
)
anchors = self.anchor_generator(features)
gt_labels, gt_boxes = self.label_anchors(anchors, gt_instances)
return self.losses(
anchors, pred_logits, gt_labels, pred_anchor_deltas, gt_boxes, pred_centerness
)
@torch.no_grad()
def match_anchors(self, anchors: List[Boxes], gt_instances: List[Instances]):
"""
Match anchors with ground truth boxes.
Args:
anchors: #level boxes, from the highest resolution to lower resolution
gt_instances: ground truth instances per image
Returns:
List[Tensor]:
#image tensors, each is a vector of matched gt
indices (or -1 for unmatched anchors) for all anchors.
"""
num_anchors_per_level = [len(x) for x in anchors]
anchors = Boxes.cat(anchors) # Rx4
anchor_centers = anchors.get_centers() # Rx2
anchor_sizes = anchors.tensor[:, 2] - anchors.tensor[:, 0] # R
lower_bound = anchor_sizes * 4
lower_bound[: num_anchors_per_level[0]] = 0
upper_bound = anchor_sizes * 8
upper_bound[-num_anchors_per_level[-1] :] = float("inf")
matched_indices = []
for gt_per_image in gt_instances:
gt_centers = gt_per_image.gt_boxes.get_centers() # Nx2
# FCOS with center sampling: anchor point must be close enough to gt center.
pairwise_match = (anchor_centers[:, None, :] - gt_centers[None, :, :]).abs_().max(
dim=2
).values < self.center_sampling_radius * anchor_sizes[:, None]
pairwise_dist = pairwise_point_box_distance(anchor_centers, gt_per_image.gt_boxes)
# The original FCOS anchor matching rule: anchor point must be inside gt
pairwise_match &= pairwise_dist.min(dim=2).values > 0
# Multilevel anchor matching in FCOS: each anchor is only responsible
# for certain scale range.
pairwise_dist = pairwise_dist.max(dim=2).values
pairwise_match &= (pairwise_dist > lower_bound[:, None]) & (
pairwise_dist < upper_bound[:, None]
)
# Match the GT box with minimum area, if there are multiple GT matches
gt_areas = gt_per_image.gt_boxes.area() # N
pairwise_match = pairwise_match.to(torch.float32) * (1e8 - gt_areas[None, :])
min_values, matched_idx = pairwise_match.max(dim=1) # R, per-anchor match
matched_idx[min_values < 1e-5] = -1 # Unmatched anchors are assigned -1
matched_indices.append(matched_idx)
return matched_indices
@torch.no_grad()
def label_anchors(self, anchors, gt_instances):
"""
Same interface as :meth:`RetinaNet.label_anchors`, but implemented with FCOS
anchor matching rule.
Unlike RetinaNet, there are no ignored anchors.
"""
matched_indices = self.match_anchors(anchors, gt_instances)
matched_labels, matched_boxes = [], []
for gt_index, gt_per_image in zip(matched_indices, gt_instances):
label = gt_per_image.gt_classes[gt_index.clip(min=0)]
label[gt_index < 0] = self.num_classes # background
matched_gt_boxes = gt_per_image.gt_boxes[gt_index.clip(min=0)]
matched_labels.append(label)
matched_boxes.append(matched_gt_boxes)
return matched_labels, matched_boxes
def losses(
self, anchors, pred_logits, gt_labels, pred_anchor_deltas, gt_boxes, pred_centerness
):
"""
This method is almost identical to :meth:`RetinaNet.losses`, with an extra
"loss_centerness" in the returned dict.
"""
num_images = len(gt_labels)
gt_labels = torch.stack(gt_labels) # (N, R)
pos_mask = (gt_labels >= 0) & (gt_labels != self.num_classes)
num_pos_anchors = pos_mask.sum().item()
get_event_storage().put_scalar("num_pos_anchors", num_pos_anchors / num_images)
normalizer = self._ema_update("loss_normalizer", max(num_pos_anchors, 1), 300)
# classification and regression loss
gt_labels_target = F.one_hot(gt_labels, num_classes=self.num_classes + 1)[
:, :, :-1
] # no loss for the last (background) class
loss_cls = sigmoid_focal_loss_jit(
torch.cat(pred_logits, dim=1),
gt_labels_target.to(pred_logits[0].dtype),
alpha=self.focal_loss_alpha,
gamma=self.focal_loss_gamma,
reduction="sum",
)
loss_box_reg = _dense_box_regression_loss(
anchors,
self.box2box_transform,
pred_anchor_deltas,
[x.tensor for x in gt_boxes],
pos_mask,
box_reg_loss_type="giou",
)
ctrness_targets = self.compute_ctrness_targets(anchors, gt_boxes) # NxR
pred_centerness = torch.cat(pred_centerness, dim=1).squeeze(dim=2) # NxR
ctrness_loss = F.binary_cross_entropy_with_logits(
pred_centerness[pos_mask], ctrness_targets[pos_mask], reduction="sum"
)
return {
"loss_fcos_cls": loss_cls / normalizer,
"loss_fcos_loc": loss_box_reg / normalizer,
"loss_fcos_ctr": ctrness_loss / normalizer,
}
def compute_ctrness_targets(self, anchors, gt_boxes): # NxR
anchors = Boxes.cat(anchors).tensor # Rx4
reg_targets = [self.box2box_transform.get_deltas(anchors, m.tensor) for m in gt_boxes]
reg_targets = torch.stack(reg_targets, dim=0) # NxRx4
if len(reg_targets) == 0:
return reg_targets.new_zeros(len(reg_targets))
left_right = reg_targets[:, :, [0, 2]]
top_bottom = reg_targets[:, :, [1, 3]]
ctrness = (left_right.min(dim=-1)[0] / left_right.max(dim=-1)[0]) * (
top_bottom.min(dim=-1)[0] / top_bottom.max(dim=-1)[0]
)
return torch.sqrt(ctrness)
def forward_inference(
self, images: ImageList, features: List[Tensor], predictions: List[List[Tensor]]
):
pred_logits, pred_anchor_deltas, pred_centerness = self._transpose_dense_predictions(
predictions, [self.num_classes, 4, 1]
)
anchors = self.anchor_generator(features)
results: List[Instances] = []
for img_idx, image_size in enumerate(images.image_sizes):
scores_per_image = [
# Multiply and sqrt centerness & classification scores
# (See eqn. 4 in https://arxiv.org/abs/2006.09214)
torch.sqrt(x[img_idx].sigmoid_() * y[img_idx].sigmoid_())
for x, y in zip(pred_logits, pred_centerness)
]
deltas_per_image = [x[img_idx] for x in pred_anchor_deltas]
results_per_image = self.inference_single_image(
anchors, scores_per_image, deltas_per_image, image_size
)
results.append(results_per_image)
return results
def inference_single_image(
self,
anchors: List[Boxes],
box_cls: List[Tensor],
box_delta: List[Tensor],
image_size: Tuple[int, int],
):
"""
Identical to :meth:`RetinaNet.inference_single_image.
"""
pred = self._decode_multi_level_predictions(
anchors,
box_cls,
box_delta,
self.test_score_thresh,
self.test_topk_candidates,
image_size,
)
keep = batched_nms(
pred.pred_boxes.tensor, pred.scores, pred.pred_classes, self.test_nms_thresh
)
return pred[keep[: self.max_detections_per_image]]
class FCOSHead(RetinaNetHead):
"""
The head used in :paper:`fcos`. It adds an additional centerness
prediction branch on top of :class:`RetinaNetHead`.
"""
def __init__(self, *, input_shape: List[ShapeSpec], conv_dims: List[int], **kwargs):
super().__init__(input_shape=input_shape, conv_dims=conv_dims, num_anchors=1, **kwargs)
# Unlike original FCOS, we do not add an additional learnable scale layer
# because it's found to have no benefits after normalizing regression targets by stride.
self._num_features = len(input_shape)
self.ctrness = nn.Conv2d(conv_dims[-1], 1, kernel_size=3, stride=1, padding=1)
torch.nn.init.normal_(self.ctrness.weight, std=0.01)
torch.nn.init.constant_(self.ctrness.bias, 0)
def forward(self, features):
assert len(features) == self._num_features
logits = []
bbox_reg = []
ctrness = []
for feature in features:
logits.append(self.cls_score(self.cls_subnet(feature)))
bbox_feature = self.bbox_subnet(feature)
bbox_reg.append(self.bbox_pred(bbox_feature))
ctrness.append(self.ctrness(bbox_feature))
return logits, bbox_reg, ctrness
# -*- coding: utf-8 -*-
# Copyright (c) Facebook, Inc. and its affiliates.
import logging
from typing import Dict, List
import torch
from torch import nn
from detectron2.config import configurable
from detectron2.structures import ImageList
from ..postprocessing import detector_postprocess, sem_seg_postprocess
from .build import META_ARCH_REGISTRY
from .rcnn import GeneralizedRCNN
from .semantic_seg import build_sem_seg_head
__all__ = ["PanopticFPN"]
@META_ARCH_REGISTRY.register()
class PanopticFPN(GeneralizedRCNN):
"""
Implement the paper :paper:`PanopticFPN`.
"""
@configurable
def __init__(
self,
*,
sem_seg_head: nn.Module,
combine_overlap_thresh: float = 0.5,
combine_stuff_area_thresh: float = 4096,
combine_instances_score_thresh: float = 0.5,
**kwargs,
):
"""
NOTE: this interface is experimental.
Args:
sem_seg_head: a module for the semantic segmentation head.
combine_overlap_thresh: combine masks into one instances if
they have enough overlap
combine_stuff_area_thresh: ignore stuff areas smaller than this threshold
combine_instances_score_thresh: ignore instances whose score is
smaller than this threshold
Other arguments are the same as :class:`GeneralizedRCNN`.
"""
super().__init__(**kwargs)
self.sem_seg_head = sem_seg_head
# options when combining instance & semantic outputs
self.combine_overlap_thresh = combine_overlap_thresh
self.combine_stuff_area_thresh = combine_stuff_area_thresh
self.combine_instances_score_thresh = combine_instances_score_thresh
@classmethod
def from_config(cls, cfg):
ret = super().from_config(cfg)
ret.update(
{
"combine_overlap_thresh": cfg.MODEL.PANOPTIC_FPN.COMBINE.OVERLAP_THRESH,
"combine_stuff_area_thresh": cfg.MODEL.PANOPTIC_FPN.COMBINE.STUFF_AREA_LIMIT,
"combine_instances_score_thresh": cfg.MODEL.PANOPTIC_FPN.COMBINE.INSTANCES_CONFIDENCE_THRESH, # noqa
}
)
ret["sem_seg_head"] = build_sem_seg_head(cfg, ret["backbone"].output_shape())
logger = logging.getLogger(__name__)
if not cfg.MODEL.PANOPTIC_FPN.COMBINE.ENABLED:
logger.warning(
"PANOPTIC_FPN.COMBINED.ENABLED is no longer used. "
" model.inference(do_postprocess=) should be used to toggle postprocessing."
)
if cfg.MODEL.PANOPTIC_FPN.INSTANCE_LOSS_WEIGHT != 1.0:
w = cfg.MODEL.PANOPTIC_FPN.INSTANCE_LOSS_WEIGHT
logger.warning(
"PANOPTIC_FPN.INSTANCE_LOSS_WEIGHT should be replaced by weights on each ROI head."
)
def update_weight(x):
if isinstance(x, dict):
return {k: v * w for k, v in x.items()}
else:
return x * w
roi_heads = ret["roi_heads"]
roi_heads.box_predictor.loss_weight = update_weight(roi_heads.box_predictor.loss_weight)
roi_heads.mask_head.loss_weight = update_weight(roi_heads.mask_head.loss_weight)
return ret
def forward(self, batched_inputs):
"""
Args:
batched_inputs: a list, batched outputs of :class:`DatasetMapper`.
Each item in the list contains the inputs for one image.
For now, each item in the list is a dict that contains:
* "image": Tensor, image in (C, H, W) format.
* "instances": Instances
* "sem_seg": semantic segmentation ground truth.
* Other information that's included in the original dicts, such as:
"height", "width" (int): the output resolution of the model, used in inference.
See :meth:`postprocess` for details.
Returns:
list[dict]:
each dict has the results for one image. The dict contains the following keys:
* "instances": see :meth:`GeneralizedRCNN.forward` for its format.
* "sem_seg": see :meth:`SemanticSegmentor.forward` for its format.
* "panoptic_seg": See the return value of
:func:`combine_semantic_and_instance_outputs` for its format.
"""
if not self.training:
return self.inference(batched_inputs)
images = self.preprocess_image(batched_inputs)
features = self.backbone(images.tensor)
assert "sem_seg" in batched_inputs[0]
gt_sem_seg = [x["sem_seg"].to(self.device) for x in batched_inputs]
gt_sem_seg = ImageList.from_tensors(
gt_sem_seg, self.backbone.size_divisibility, self.sem_seg_head.ignore_value
).tensor
sem_seg_results, sem_seg_losses = self.sem_seg_head(features, gt_sem_seg)
gt_instances = [x["instances"].to(self.device) for x in batched_inputs]
proposals, proposal_losses = self.proposal_generator(images, features, gt_instances)
detector_results, detector_losses = self.roi_heads(
images, features, proposals, gt_instances
)
losses = sem_seg_losses
losses.update(proposal_losses)
losses.update(detector_losses)
return losses
def inference(self, batched_inputs: List[Dict[str, torch.Tensor]], do_postprocess: bool = True):
"""
Run inference on the given inputs.
Args:
batched_inputs (list[dict]): same as in :meth:`forward`
do_postprocess (bool): whether to apply post-processing on the outputs.
Returns:
When do_postprocess=True, see docs in :meth:`forward`.
Otherwise, returns a (list[Instances], list[Tensor]) that contains
the raw detector outputs, and raw semantic segmentation outputs.
"""
images = self.preprocess_image(batched_inputs)
features = self.backbone(images.tensor)
sem_seg_results, sem_seg_losses = self.sem_seg_head(features, None)
proposals, _ = self.proposal_generator(images, features, None)
detector_results, _ = self.roi_heads(images, features, proposals, None)
if do_postprocess:
processed_results = []
for sem_seg_result, detector_result, input_per_image, image_size in zip(
sem_seg_results, detector_results, batched_inputs, images.image_sizes
):
height = input_per_image.get("height", image_size[0])
width = input_per_image.get("width", image_size[1])
sem_seg_r = sem_seg_postprocess(sem_seg_result, image_size, height, width)
detector_r = detector_postprocess(detector_result, height, width)
processed_results.append({"sem_seg": sem_seg_r, "instances": detector_r})
panoptic_r = combine_semantic_and_instance_outputs(
detector_r,
sem_seg_r.argmax(dim=0),
self.combine_overlap_thresh,
self.combine_stuff_area_thresh,
self.combine_instances_score_thresh,
)
processed_results[-1]["panoptic_seg"] = panoptic_r
return processed_results
else:
return detector_results, sem_seg_results
def combine_semantic_and_instance_outputs(
instance_results,
semantic_results,
overlap_threshold,
stuff_area_thresh,
instances_score_thresh,
):
"""
Implement a simple combining logic following
"combine_semantic_and_instance_predictions.py" in panopticapi
to produce panoptic segmentation outputs.
Args:
instance_results: output of :func:`detector_postprocess`.
semantic_results: an (H, W) tensor, each element is the contiguous semantic
category id
Returns:
panoptic_seg (Tensor): of shape (height, width) where the values are ids for each segment.
segments_info (list[dict]): Describe each segment in `panoptic_seg`.
Each dict contains keys "id", "category_id", "isthing".
"""
panoptic_seg = torch.zeros_like(semantic_results, dtype=torch.int32)
# sort instance outputs by scores
sorted_inds = torch.argsort(-instance_results.scores)
current_segment_id = 0
segments_info = []
instance_masks = instance_results.pred_masks.to(dtype=torch.bool, device=panoptic_seg.device)
# Add instances one-by-one, check for overlaps with existing ones
for inst_id in sorted_inds:
score = instance_results.scores[inst_id].item()
if score < instances_score_thresh:
break
mask = instance_masks[inst_id] # H,W
mask_area = mask.sum().item()
if mask_area == 0:
continue
intersect = (mask > 0) & (panoptic_seg > 0)
intersect_area = intersect.sum().item()
if intersect_area * 1.0 / mask_area > overlap_threshold:
continue
if intersect_area > 0:
mask = mask & (panoptic_seg == 0)
current_segment_id += 1
panoptic_seg[mask] = current_segment_id
segments_info.append(
{
"id": current_segment_id,
"isthing": True,
"score": score,
"category_id": instance_results.pred_classes[inst_id].item(),
"instance_id": inst_id.item(),
}
)
# Add semantic results to remaining empty areas
semantic_labels = torch.unique(semantic_results).cpu().tolist()
for semantic_label in semantic_labels:
if semantic_label == 0: # 0 is a special "thing" class
continue
mask = (semantic_results == semantic_label) & (panoptic_seg == 0)
mask_area = mask.sum().item()
if mask_area < stuff_area_thresh:
continue
current_segment_id += 1
panoptic_seg[mask] = current_segment_id
segments_info.append(
{
"id": current_segment_id,
"isthing": False,
"category_id": semantic_label,
"area": mask_area,
}
)
return panoptic_seg, segments_info
# Copyright (c) Facebook, Inc. and its affiliates.
import logging
import numpy as np
from typing import Dict, List, Optional, Tuple
import torch
from torch import nn
from detectron2.config import configurable
from detectron2.data.detection_utils import convert_image_to_rgb
from detectron2.structures import ImageList, Instances
from detectron2.utils.events import get_event_storage
from detectron2.utils.logger import log_first_n
from ..backbone import Backbone, build_backbone
from ..postprocessing import detector_postprocess
from ..proposal_generator import build_proposal_generator
from ..roi_heads import build_roi_heads
from .build import META_ARCH_REGISTRY
__all__ = ["GeneralizedRCNN", "ProposalNetwork"]
@META_ARCH_REGISTRY.register()
class GeneralizedRCNN(nn.Module):
"""
Generalized R-CNN. Any models that contains the following three components:
1. Per-image feature extraction (aka backbone)
2. Region proposal generation
3. Per-region feature extraction and prediction
"""
@configurable
def __init__(
self,
*,
backbone: Backbone,
proposal_generator: nn.Module,
roi_heads: nn.Module,
pixel_mean: Tuple[float],
pixel_std: Tuple[float],
input_format: Optional[str] = None,
vis_period: int = 0,
):
"""
Args:
backbone: a backbone module, must follow detectron2's backbone interface
proposal_generator: a module that generates proposals using backbone features
roi_heads: a ROI head that performs per-region computation
pixel_mean, pixel_std: list or tuple with #channels element, representing
the per-channel mean and std to be used to normalize the input image
input_format: describe the meaning of channels of input. Needed by visualization
vis_period: the period to run visualization. Set to 0 to disable.
"""
super().__init__()
self.backbone = backbone
self.proposal_generator = proposal_generator
self.roi_heads = roi_heads
self.input_format = input_format
self.vis_period = vis_period
if vis_period > 0:
assert input_format is not None, "input_format is required for visualization!"
self.register_buffer("pixel_mean", torch.tensor(pixel_mean).view(-1, 1, 1), False)
self.register_buffer("pixel_std", torch.tensor(pixel_std).view(-1, 1, 1), False)
assert (
self.pixel_mean.shape == self.pixel_std.shape
), f"{self.pixel_mean} and {self.pixel_std} have different shapes!"
@classmethod
def from_config(cls, cfg):
backbone = build_backbone(cfg)
return {
"backbone": backbone,
"proposal_generator": build_proposal_generator(cfg, backbone.output_shape()),
"roi_heads": build_roi_heads(cfg, backbone.output_shape()),
"input_format": cfg.INPUT.FORMAT,
"vis_period": cfg.VIS_PERIOD,
"pixel_mean": cfg.MODEL.PIXEL_MEAN,
"pixel_std": cfg.MODEL.PIXEL_STD,
}
@property
def device(self):
return self.pixel_mean.device
def visualize_training(self, batched_inputs, proposals):
"""
A function used to visualize images and proposals. It shows ground truth
bounding boxes on the original image and up to 20 top-scoring predicted
object proposals on the original image. Users can implement different
visualization functions for different models.
Args:
batched_inputs (list): a list that contains input to the model.
proposals (list): a list that contains predicted proposals. Both
batched_inputs and proposals should have the same length.
"""
from detectron2.utils.visualizer import Visualizer
storage = get_event_storage()
max_vis_prop = 20
for input, prop in zip(batched_inputs, proposals):
img = input["image"]
img = convert_image_to_rgb(img.permute(1, 2, 0), self.input_format)
v_gt = Visualizer(img, None)
v_gt = v_gt.overlay_instances(boxes=input["instances"].gt_boxes)
anno_img = v_gt.get_image()
box_size = min(len(prop.proposal_boxes), max_vis_prop)
v_pred = Visualizer(img, None)
v_pred = v_pred.overlay_instances(
boxes=prop.proposal_boxes[0:box_size].tensor.cpu().numpy()
)
prop_img = v_pred.get_image()
vis_img = np.concatenate((anno_img, prop_img), axis=1)
vis_img = vis_img.transpose(2, 0, 1)
vis_name = "Left: GT bounding boxes; Right: Predicted proposals"
storage.put_image(vis_name, vis_img)
break # only visualize one image in a batch
def forward(self, batched_inputs: List[Dict[str, torch.Tensor]]):
"""
Args:
batched_inputs: a list, batched outputs of :class:`DatasetMapper` .
Each item in the list contains the inputs for one image.
For now, each item in the list is a dict that contains:
* image: Tensor, image in (C, H, W) format.
* instances (optional): groundtruth :class:`Instances`
* proposals (optional): :class:`Instances`, precomputed proposals.
Other information that's included in the original dicts, such as:
* "height", "width" (int): the output resolution of the model, used in inference.
See :meth:`postprocess` for details.
Returns:
list[dict]:
Each dict is the output for one input image.
The dict contains one key "instances" whose value is a :class:`Instances`.
The :class:`Instances` object has the following keys:
"pred_boxes", "pred_classes", "scores", "pred_masks", "pred_keypoints"
"""
if not self.training:
return self.inference(batched_inputs)
images = self.preprocess_image(batched_inputs)
if "instances" in batched_inputs[0]:
gt_instances = [x["instances"].to(self.device) for x in batched_inputs]
else:
gt_instances = None
features = self.backbone(images.tensor)
if self.proposal_generator is not None:
proposals, proposal_losses = self.proposal_generator(images, features, gt_instances)
else:
assert "proposals" in batched_inputs[0]
proposals = [x["proposals"].to(self.device) for x in batched_inputs]
proposal_losses = {}
_, detector_losses = self.roi_heads(images, features, proposals, gt_instances)
if self.vis_period > 0:
storage = get_event_storage()
if storage.iter % self.vis_period == 0:
self.visualize_training(batched_inputs, proposals)
losses = {}
losses.update(detector_losses)
losses.update(proposal_losses)
return losses
def inference(
self,
batched_inputs: List[Dict[str, torch.Tensor]],
detected_instances: Optional[List[Instances]] = None,
do_postprocess: bool = True,
):
"""
Run inference on the given inputs.
Args:
batched_inputs (list[dict]): same as in :meth:`forward`
detected_instances (None or list[Instances]): if not None, it
contains an `Instances` object per image. The `Instances`
object contains "pred_boxes" and "pred_classes" which are
known boxes in the image.
The inference will then skip the detection of bounding boxes,
and only predict other per-ROI outputs.
do_postprocess (bool): whether to apply post-processing on the outputs.
Returns:
When do_postprocess=True, same as in :meth:`forward`.
Otherwise, a list[Instances] containing raw network outputs.
"""
assert not self.training
images = self.preprocess_image(batched_inputs)
features = self.backbone(images.tensor)
if detected_instances is None:
if self.proposal_generator is not None:
proposals, _ = self.proposal_generator(images, features, None)
else:
assert "proposals" in batched_inputs[0]
proposals = [x["proposals"].to(self.device) for x in batched_inputs]
results, _ = self.roi_heads(images, features, proposals, None)
else:
detected_instances = [x.to(self.device) for x in detected_instances]
results = self.roi_heads.forward_with_given_boxes(features, detected_instances)
if do_postprocess:
assert not torch.jit.is_scripting(), "Scripting is not supported for postprocess."
return GeneralizedRCNN._postprocess(results, batched_inputs, images.image_sizes)
else:
return results
def preprocess_image(self, batched_inputs: List[Dict[str, torch.Tensor]]):
"""
Normalize, pad and batch the input images.
"""
images = [x["image"].to(self.device) for x in batched_inputs]
images = [(x - self.pixel_mean) / self.pixel_std for x in images]
images = ImageList.from_tensors(images, self.backbone.size_divisibility)
return images
@staticmethod
def _postprocess(instances, batched_inputs: List[Dict[str, torch.Tensor]], image_sizes):
"""
Rescale the output instances to the target size.
"""
# note: private function; subject to changes
processed_results = []
for results_per_image, input_per_image, image_size in zip(
instances, batched_inputs, image_sizes
):
height = input_per_image.get("height", image_size[0])
width = input_per_image.get("width", image_size[1])
r = detector_postprocess(results_per_image, height, width)
processed_results.append({"instances": r})
return processed_results
@META_ARCH_REGISTRY.register()
class ProposalNetwork(nn.Module):
"""
A meta architecture that only predicts object proposals.
"""
@configurable
def __init__(
self,
*,
backbone: Backbone,
proposal_generator: nn.Module,
pixel_mean: Tuple[float],
pixel_std: Tuple[float],
):
"""
Args:
backbone: a backbone module, must follow detectron2's backbone interface
proposal_generator: a module that generates proposals using backbone features
pixel_mean, pixel_std: list or tuple with #channels element, representing
the per-channel mean and std to be used to normalize the input image
"""
super().__init__()
self.backbone = backbone
self.proposal_generator = proposal_generator
self.register_buffer("pixel_mean", torch.tensor(pixel_mean).view(-1, 1, 1), False)
self.register_buffer("pixel_std", torch.tensor(pixel_std).view(-1, 1, 1), False)
@classmethod
def from_config(cls, cfg):
backbone = build_backbone(cfg)
return {
"backbone": backbone,
"proposal_generator": build_proposal_generator(cfg, backbone.output_shape()),
"pixel_mean": cfg.MODEL.PIXEL_MEAN,
"pixel_std": cfg.MODEL.PIXEL_STD,
}
@property
def device(self):
return self.pixel_mean.device
def forward(self, batched_inputs):
"""
Args:
Same as in :class:`GeneralizedRCNN.forward`
Returns:
list[dict]:
Each dict is the output for one input image.
The dict contains one key "proposals" whose value is a
:class:`Instances` with keys "proposal_boxes" and "objectness_logits".
"""
images = [x["image"].to(self.device) for x in batched_inputs]
images = [(x - self.pixel_mean) / self.pixel_std for x in images]
images = ImageList.from_tensors(images, self.backbone.size_divisibility)
features = self.backbone(images.tensor)
if "instances" in batched_inputs[0]:
gt_instances = [x["instances"].to(self.device) for x in batched_inputs]
elif "targets" in batched_inputs[0]:
log_first_n(
logging.WARN, "'targets' in the model inputs is now renamed to 'instances'!", n=10
)
gt_instances = [x["targets"].to(self.device) for x in batched_inputs]
else:
gt_instances = None
proposals, proposal_losses = self.proposal_generator(images, features, gt_instances)
# In training, the proposals are not useful at all but we generate them anyway.
# This makes RPN-only models about 5% slower.
if self.training:
return proposal_losses
processed_results = []
for results_per_image, input_per_image, image_size in zip(
proposals, batched_inputs, images.image_sizes
):
height = input_per_image.get("height", image_size[0])
width = input_per_image.get("width", image_size[1])
r = detector_postprocess(results_per_image, height, width)
processed_results.append({"proposals": r})
return processed_results
# Copyright (c) Facebook, Inc. and its affiliates.
import logging
import math
from typing import List, Tuple
import torch
from fvcore.nn import sigmoid_focal_loss_jit
from torch import Tensor, nn
from torch.nn import functional as F
from detectron2.config import configurable
from detectron2.layers import CycleBatchNormList, ShapeSpec, batched_nms, cat, get_norm
from detectron2.structures import Boxes, ImageList, Instances, pairwise_iou
from detectron2.utils.events import get_event_storage
from ..anchor_generator import build_anchor_generator
from ..backbone import Backbone, build_backbone
from ..box_regression import Box2BoxTransform, _dense_box_regression_loss
from ..matcher import Matcher
from .build import META_ARCH_REGISTRY
from .dense_detector import DenseDetector, permute_to_N_HWA_K # noqa
__all__ = ["RetinaNet"]
logger = logging.getLogger(__name__)
@META_ARCH_REGISTRY.register()
class RetinaNet(DenseDetector):
"""
Implement RetinaNet in :paper:`RetinaNet`.
"""
@configurable
def __init__(
self,
*,
backbone: Backbone,
head: nn.Module,
head_in_features,
anchor_generator,
box2box_transform,
anchor_matcher,
num_classes,
focal_loss_alpha=0.25,
focal_loss_gamma=2.0,
smooth_l1_beta=0.0,
box_reg_loss_type="smooth_l1",
test_score_thresh=0.05,
test_topk_candidates=1000,
test_nms_thresh=0.5,
max_detections_per_image=100,
pixel_mean,
pixel_std,
vis_period=0,
input_format="BGR",
):
"""
NOTE: this interface is experimental.
Args:
backbone: a backbone module, must follow detectron2's backbone interface
head (nn.Module): a module that predicts logits and regression deltas
for each level from a list of per-level features
head_in_features (Tuple[str]): Names of the input feature maps to be used in head
anchor_generator (nn.Module): a module that creates anchors from a
list of features. Usually an instance of :class:`AnchorGenerator`
box2box_transform (Box2BoxTransform): defines the transform from anchors boxes to
instance boxes
anchor_matcher (Matcher): label the anchors by matching them with ground truth.
num_classes (int): number of classes. Used to label background proposals.
# Loss parameters:
focal_loss_alpha (float): focal_loss_alpha
focal_loss_gamma (float): focal_loss_gamma
smooth_l1_beta (float): smooth_l1_beta
box_reg_loss_type (str): Options are "smooth_l1", "giou", "diou", "ciou"
# Inference parameters:
test_score_thresh (float): Inference cls score threshold, only anchors with
score > INFERENCE_TH are considered for inference (to improve speed)
test_topk_candidates (int): Select topk candidates before NMS
test_nms_thresh (float): Overlap threshold used for non-maximum suppression
(suppress boxes with IoU >= this threshold)
max_detections_per_image (int):
Maximum number of detections to return per image during inference
(100 is based on the limit established for the COCO dataset).
pixel_mean, pixel_std: see :class:`DenseDetector`.
"""
super().__init__(
backbone, head, head_in_features, pixel_mean=pixel_mean, pixel_std=pixel_std
)
self.num_classes = num_classes
# Anchors
self.anchor_generator = anchor_generator
self.box2box_transform = box2box_transform
self.anchor_matcher = anchor_matcher
# Loss parameters:
self.focal_loss_alpha = focal_loss_alpha
self.focal_loss_gamma = focal_loss_gamma
self.smooth_l1_beta = smooth_l1_beta
self.box_reg_loss_type = box_reg_loss_type
# Inference parameters:
self.test_score_thresh = test_score_thresh
self.test_topk_candidates = test_topk_candidates
self.test_nms_thresh = test_nms_thresh
self.max_detections_per_image = max_detections_per_image
# Vis parameters
self.vis_period = vis_period
self.input_format = input_format
@classmethod
def from_config(cls, cfg):
backbone = build_backbone(cfg)
backbone_shape = backbone.output_shape()
feature_shapes = [backbone_shape[f] for f in cfg.MODEL.RETINANET.IN_FEATURES]
head = RetinaNetHead(cfg, feature_shapes)
anchor_generator = build_anchor_generator(cfg, feature_shapes)
return {
"backbone": backbone,
"head": head,
"anchor_generator": anchor_generator,
"box2box_transform": Box2BoxTransform(weights=cfg.MODEL.RETINANET.BBOX_REG_WEIGHTS),
"anchor_matcher": Matcher(
cfg.MODEL.RETINANET.IOU_THRESHOLDS,
cfg.MODEL.RETINANET.IOU_LABELS,
allow_low_quality_matches=True,
),
"pixel_mean": cfg.MODEL.PIXEL_MEAN,
"pixel_std": cfg.MODEL.PIXEL_STD,
"num_classes": cfg.MODEL.RETINANET.NUM_CLASSES,
"head_in_features": cfg.MODEL.RETINANET.IN_FEATURES,
# Loss parameters:
"focal_loss_alpha": cfg.MODEL.RETINANET.FOCAL_LOSS_ALPHA,
"focal_loss_gamma": cfg.MODEL.RETINANET.FOCAL_LOSS_GAMMA,
"smooth_l1_beta": cfg.MODEL.RETINANET.SMOOTH_L1_LOSS_BETA,
"box_reg_loss_type": cfg.MODEL.RETINANET.BBOX_REG_LOSS_TYPE,
# Inference parameters:
"test_score_thresh": cfg.MODEL.RETINANET.SCORE_THRESH_TEST,
"test_topk_candidates": cfg.MODEL.RETINANET.TOPK_CANDIDATES_TEST,
"test_nms_thresh": cfg.MODEL.RETINANET.NMS_THRESH_TEST,
"max_detections_per_image": cfg.TEST.DETECTIONS_PER_IMAGE,
# Vis parameters
"vis_period": cfg.VIS_PERIOD,
"input_format": cfg.INPUT.FORMAT,
}
def forward_training(self, images, features, predictions, gt_instances):
# Transpose the Hi*Wi*A dimension to the middle:
pred_logits, pred_anchor_deltas = self._transpose_dense_predictions(
predictions, [self.num_classes, 4]
)
anchors = self.anchor_generator(features)
gt_labels, gt_boxes = self.label_anchors(anchors, gt_instances)
return self.losses(anchors, pred_logits, gt_labels, pred_anchor_deltas, gt_boxes)
def losses(self, anchors, pred_logits, gt_labels, pred_anchor_deltas, gt_boxes):
"""
Args:
anchors (list[Boxes]): a list of #feature level Boxes
gt_labels, gt_boxes: see output of :meth:`RetinaNet.label_anchors`.
Their shapes are (N, R) and (N, R, 4), respectively, where R is
the total number of anchors across levels, i.e. sum(Hi x Wi x Ai)
pred_logits, pred_anchor_deltas: both are list[Tensor]. Each element in the
list corresponds to one level and has shape (N, Hi * Wi * Ai, K or 4).
Where K is the number of classes used in `pred_logits`.
Returns:
dict[str, Tensor]:
mapping from a named loss to a scalar tensor storing the loss.
Used during training only. The dict keys are: "loss_cls" and "loss_box_reg"
"""
num_images = len(gt_labels)
gt_labels = torch.stack(gt_labels) # (N, R)
valid_mask = gt_labels >= 0
pos_mask = (gt_labels >= 0) & (gt_labels != self.num_classes)
num_pos_anchors = pos_mask.sum().item()
get_event_storage().put_scalar("num_pos_anchors", num_pos_anchors / num_images)
normalizer = self._ema_update("loss_normalizer", max(num_pos_anchors, 1), 100)
# classification and regression loss
gt_labels_target = F.one_hot(gt_labels[valid_mask], num_classes=self.num_classes + 1)[
:, :-1
] # no loss for the last (background) class
loss_cls = sigmoid_focal_loss_jit(
cat(pred_logits, dim=1)[valid_mask],
gt_labels_target.to(pred_logits[0].dtype),
alpha=self.focal_loss_alpha,
gamma=self.focal_loss_gamma,
reduction="sum",
)
loss_box_reg = _dense_box_regression_loss(
anchors,
self.box2box_transform,
pred_anchor_deltas,
gt_boxes,
pos_mask,
box_reg_loss_type=self.box_reg_loss_type,
smooth_l1_beta=self.smooth_l1_beta,
)
return {
"loss_cls": loss_cls / normalizer,
"loss_box_reg": loss_box_reg / normalizer,
}
@torch.no_grad()
def label_anchors(self, anchors, gt_instances):
"""
Args:
anchors (list[Boxes]): A list of #feature level Boxes.
The Boxes contains anchors of this image on the specific feature level.
gt_instances (list[Instances]): a list of N `Instances`s. The i-th
`Instances` contains the ground-truth per-instance annotations
for the i-th input image.
Returns:
list[Tensor]: List of #img tensors. i-th element is a vector of labels whose length is
the total number of anchors across all feature maps (sum(Hi * Wi * A)).
Label values are in {-1, 0, ..., K}, with -1 means ignore, and K means background.
list[Tensor]: i-th element is a Rx4 tensor, where R is the total number of anchors
across feature maps. The values are the matched gt boxes for each anchor.
Values are undefined for those anchors not labeled as foreground.
"""
anchors = Boxes.cat(anchors) # Rx4
gt_labels = []
matched_gt_boxes = []
for gt_per_image in gt_instances:
match_quality_matrix = pairwise_iou(gt_per_image.gt_boxes, anchors)
matched_idxs, anchor_labels = self.anchor_matcher(match_quality_matrix)
del match_quality_matrix
if len(gt_per_image) > 0:
matched_gt_boxes_i = gt_per_image.gt_boxes.tensor[matched_idxs]
gt_labels_i = gt_per_image.gt_classes[matched_idxs]
# Anchors with label 0 are treated as background.
gt_labels_i[anchor_labels == 0] = self.num_classes
# Anchors with label -1 are ignored.
gt_labels_i[anchor_labels == -1] = -1
else:
matched_gt_boxes_i = torch.zeros_like(anchors.tensor)
gt_labels_i = torch.zeros_like(matched_idxs) + self.num_classes
gt_labels.append(gt_labels_i)
matched_gt_boxes.append(matched_gt_boxes_i)
return gt_labels, matched_gt_boxes
def forward_inference(
self, images: ImageList, features: List[Tensor], predictions: List[List[Tensor]]
):
pred_logits, pred_anchor_deltas = self._transpose_dense_predictions(
predictions, [self.num_classes, 4]
)
anchors = self.anchor_generator(features)
results: List[Instances] = []
for img_idx, image_size in enumerate(images.image_sizes):
scores_per_image = [x[img_idx].sigmoid_() for x in pred_logits]
deltas_per_image = [x[img_idx] for x in pred_anchor_deltas]
results_per_image = self.inference_single_image(
anchors, scores_per_image, deltas_per_image, image_size
)
results.append(results_per_image)
return results
def inference_single_image(
self,
anchors: List[Boxes],
box_cls: List[Tensor],
box_delta: List[Tensor],
image_size: Tuple[int, int],
):
"""
Single-image inference. Return bounding-box detection results by thresholding
on scores and applying non-maximum suppression (NMS).
Arguments:
anchors (list[Boxes]): list of #feature levels. Each entry contains
a Boxes object, which contains all the anchors in that feature level.
box_cls (list[Tensor]): list of #feature levels. Each entry contains
tensor of size (H x W x A, K)
box_delta (list[Tensor]): Same shape as 'box_cls' except that K becomes 4.
image_size (tuple(H, W)): a tuple of the image height and width.
Returns:
Same as `inference`, but for only one image.
"""
pred = self._decode_multi_level_predictions(
anchors,
box_cls,
box_delta,
self.test_score_thresh,
self.test_topk_candidates,
image_size,
)
keep = batched_nms( # per-class NMS
pred.pred_boxes.tensor, pred.scores, pred.pred_classes, self.test_nms_thresh
)
return pred[keep[: self.max_detections_per_image]]
class RetinaNetHead(nn.Module):
"""
The head used in RetinaNet for object classification and box regression.
It has two subnets for the two tasks, with a common structure but separate parameters.
"""
@configurable
def __init__(
self,
*,
input_shape: List[ShapeSpec],
num_classes,
num_anchors,
conv_dims: List[int],
norm="",
prior_prob=0.01,
):
"""
NOTE: this interface is experimental.
Args:
input_shape (List[ShapeSpec]): input shape
num_classes (int): number of classes. Used to label background proposals.
num_anchors (int): number of generated anchors
conv_dims (List[int]): dimensions for each convolution layer
norm (str or callable):
Normalization for conv layers except for the two output layers.
See :func:`detectron2.layers.get_norm` for supported types.
prior_prob (float): Prior weight for computing bias
"""
super().__init__()
self._num_features = len(input_shape)
if norm == "BN" or norm == "SyncBN":
logger.info(
f"Using domain-specific {norm} in RetinaNetHead with len={self._num_features}."
)
bn_class = nn.BatchNorm2d if norm == "BN" else nn.SyncBatchNorm
def norm(c):
return CycleBatchNormList(
length=self._num_features, bn_class=bn_class, num_features=c
)
else:
norm_name = str(type(get_norm(norm, 1)))
if "BN" in norm_name:
logger.warning(
f"Shared BatchNorm (type={norm_name}) may not work well in RetinaNetHead."
)
cls_subnet = []
bbox_subnet = []
for in_channels, out_channels in zip(
[input_shape[0].channels] + list(conv_dims), conv_dims
):
cls_subnet.append(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1)
)
if norm:
cls_subnet.append(get_norm(norm, out_channels))
cls_subnet.append(nn.ReLU())
bbox_subnet.append(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1)
)
if norm:
bbox_subnet.append(get_norm(norm, out_channels))
bbox_subnet.append(nn.ReLU())
self.cls_subnet = nn.Sequential(*cls_subnet)
self.bbox_subnet = nn.Sequential(*bbox_subnet)
self.cls_score = nn.Conv2d(
conv_dims[-1], num_anchors * num_classes, kernel_size=3, stride=1, padding=1
)
self.bbox_pred = nn.Conv2d(
conv_dims[-1], num_anchors * 4, kernel_size=3, stride=1, padding=1
)
# Initialization
for modules in [self.cls_subnet, self.bbox_subnet, self.cls_score, self.bbox_pred]:
for layer in modules.modules():
if isinstance(layer, nn.Conv2d):
torch.nn.init.normal_(layer.weight, mean=0, std=0.01)
torch.nn.init.constant_(layer.bias, 0)
# Use prior in model initialization to improve stability
bias_value = -(math.log((1 - prior_prob) / prior_prob))
torch.nn.init.constant_(self.cls_score.bias, bias_value)
@classmethod
def from_config(cls, cfg, input_shape: List[ShapeSpec]):
num_anchors = build_anchor_generator(cfg, input_shape).num_cell_anchors
assert (
len(set(num_anchors)) == 1
), "Using different number of anchors between levels is not currently supported!"
num_anchors = num_anchors[0]
return {
"input_shape": input_shape,
"num_classes": cfg.MODEL.RETINANET.NUM_CLASSES,
"conv_dims": [input_shape[0].channels] * cfg.MODEL.RETINANET.NUM_CONVS,
"prior_prob": cfg.MODEL.RETINANET.PRIOR_PROB,
"norm": cfg.MODEL.RETINANET.NORM,
"num_anchors": num_anchors,
}
def forward(self, features: List[Tensor]):
"""
Arguments:
features (list[Tensor]): FPN feature map tensors in high to low resolution.
Each tensor in the list correspond to different feature levels.
Returns:
logits (list[Tensor]): #lvl tensors, each has shape (N, AxK, Hi, Wi).
The tensor predicts the classification probability
at each spatial position for each of the A anchors and K object
classes.
bbox_reg (list[Tensor]): #lvl tensors, each has shape (N, Ax4, Hi, Wi).
The tensor predicts 4-vector (dx,dy,dw,dh) box
regression values for every anchor. These values are the
relative offset between the anchor and the ground truth box.
"""
assert len(features) == self._num_features
logits = []
bbox_reg = []
for feature in features:
logits.append(self.cls_score(self.cls_subnet(feature)))
bbox_reg.append(self.bbox_pred(self.bbox_subnet(feature)))
return logits, bbox_reg
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment