from collections import OrderedDict import torch from torch import nn import torch.nn.functional as F from torchvision.ops import misc as misc_nn_ops from torchvision.ops import MultiScaleRoIAlign from torchvision.ops.feature_pyramid_network import FeaturePyramidNetwork, LastLevelMaxPool from .generalized_rcnn import GeneralizedRCNN from .rpn import AnchorGenerator, RPNHead, RegionProposalNetwork from .roi_heads import RoIHeads from .transform import GeneralizedRCNNTransform from .._utils import IntermediateLayerGetter __all__ = [ "FasterRCNN", "MaskRCNN", "fasterrcnn_resnet50_fpn", "maskrcnn_resnet50_fpn", "KeypointRCNN", "keypointrcnn_resnet50_fpn" ] class BackboneWithFPN(nn.Sequential): def __init__(self, backbone, return_layers, in_channels_list, out_channels): body = IntermediateLayerGetter(backbone, return_layers=return_layers) fpn = FeaturePyramidNetwork( in_channels_list=in_channels_list, out_channels=out_channels, extra_blocks=LastLevelMaxPool(), ) super(BackboneWithFPN, self).__init__(OrderedDict( [("body", body), ("fpn", fpn)])) self.out_channels = out_channels class FasterRCNN(GeneralizedRCNN): def __init__(self, backbone, num_classes=None, # transform parameters min_size=800, max_size=1333, image_mean=None, image_std=None, # RPN parameters rpn_anchor_generator=None, rpn_head=None, rpn_pre_nms_top_n_train=2000, rpn_pre_nms_top_n_test=1000, rpn_post_nms_top_n_train=2000, rpn_post_nms_top_n_test=1000, rpn_nms_thresh=0.7, rpn_fg_iou_thresh=0.7, rpn_bg_iou_thresh=0.3, rpn_batch_size_per_image=256, rpn_positive_fraction=0.5, # Box parameters box_roi_pool=None, box_head=None, box_predictor=None, box_score_thresh=0.05, box_nms_thresh=0.5, box_detections_per_img=100, box_fg_iou_thresh=0.5, box_bg_iou_thresh=0.5, box_batch_size_per_image=512, box_positive_fraction=0.25, bbox_reg_weights=None): if not hasattr(backbone, "out_channels"): raise ValueError( "backbone should contain an attribute out_channels " "specifying the number of output channels (assumed to be the " "same for all the levels)") assert isinstance(rpn_anchor_generator, (AnchorGenerator, type(None))) assert isinstance(box_roi_pool, (MultiScaleRoIAlign, type(None))) if num_classes is not None: if box_predictor is not None: raise ValueError("num_classes should be None when box_predictor is specified") else: if box_predictor is None: raise ValueError("num_classes should not be None when box_predictor " "is not specified") out_channels = backbone.out_channels if rpn_anchor_generator is None: anchor_sizes = ((32,), (64,), (128,), (256,), (512,)) aspect_ratios = ((0.5, 1.0, 2.0),) * len(anchor_sizes) rpn_anchor_generator = AnchorGenerator( anchor_sizes, aspect_ratios ) if rpn_head is None: rpn_head = RPNHead( out_channels, rpn_anchor_generator.num_anchors_per_location()[0] ) rpn_pre_nms_top_n = dict(training=rpn_pre_nms_top_n_train, testing=rpn_pre_nms_top_n_test) rpn_post_nms_top_n = dict(training=rpn_post_nms_top_n_train, testing=rpn_post_nms_top_n_test) rpn = RegionProposalNetwork( rpn_anchor_generator, rpn_head, rpn_fg_iou_thresh, rpn_bg_iou_thresh, rpn_batch_size_per_image, rpn_positive_fraction, rpn_pre_nms_top_n, rpn_post_nms_top_n, rpn_nms_thresh) if box_roi_pool is None: box_roi_pool = MultiScaleRoIAlign( featmap_names=[0, 1, 2, 3], output_size=7, sampling_ratio=2) if box_head is None: resolution = box_roi_pool.output_size[0] representation_size = 1024 box_head = TwoMLPHead( out_channels * resolution ** 2, representation_size) if box_predictor is None: representation_size = 1024 box_predictor = FastRCNNPredictor( representation_size, num_classes) roi_heads = RoIHeads( # Box box_roi_pool, box_head, box_predictor, box_fg_iou_thresh, box_bg_iou_thresh, box_batch_size_per_image, box_positive_fraction, bbox_reg_weights, box_score_thresh, box_nms_thresh, box_detections_per_img) if image_mean is None: image_mean = [0.485, 0.456, 0.406] if image_std is None: image_std = [0.229, 0.224, 0.225] transform = GeneralizedRCNNTransform(min_size, max_size, image_mean, image_std) super(FasterRCNN, self).__init__(backbone, rpn, roi_heads, transform) class MaskRCNN(FasterRCNN): def __init__(self, backbone, num_classes=None, # transform parameters min_size=800, max_size=1333, image_mean=None, image_std=None, # RPN parameters rpn_anchor_generator=None, rpn_head=None, rpn_pre_nms_top_n_train=2000, rpn_pre_nms_top_n_test=1000, rpn_post_nms_top_n_train=2000, rpn_post_nms_top_n_test=1000, rpn_nms_thresh=0.7, rpn_fg_iou_thresh=0.7, rpn_bg_iou_thresh=0.3, rpn_batch_size_per_image=256, rpn_positive_fraction=0.5, # Box parameters box_roi_pool=None, box_head=None, box_predictor=None, box_score_thresh=0.05, box_nms_thresh=0.5, box_detections_per_img=100, box_fg_iou_thresh=0.5, box_bg_iou_thresh=0.5, box_batch_size_per_image=512, box_positive_fraction=0.25, bbox_reg_weights=None, # Mask parameters mask_roi_pool=None, mask_head=None, mask_predictor=None, mask_discretization_size=28): assert isinstance(mask_roi_pool, (MultiScaleRoIAlign, type(None))) if num_classes is not None: if mask_predictor is not None: raise ValueError("num_classes should be None when mask_predictor is specified") out_channels = backbone.out_channels if mask_roi_pool is None: mask_roi_pool = MultiScaleRoIAlign( featmap_names=[0, 1, 2, 3], output_size=14, sampling_ratio=2) if mask_head is None: mask_layers = (256, 256, 256, 256) mask_dilation = 1 mask_head = MaskRCNNHeads(out_channels, mask_layers, mask_dilation) if mask_predictor is None: mask_dim_reduced = 256 # == mask_layers[-1] mask_predictor = MaskRCNNC4Predictor(out_channels, mask_dim_reduced, num_classes) super(MaskRCNN, self).__init__( backbone, num_classes, # transform parameters min_size, max_size, image_mean, image_std, # RPN-specific parameters rpn_anchor_generator, rpn_head, rpn_pre_nms_top_n_train, rpn_pre_nms_top_n_test, rpn_post_nms_top_n_train, rpn_post_nms_top_n_test, rpn_nms_thresh, rpn_fg_iou_thresh, rpn_bg_iou_thresh, rpn_batch_size_per_image, rpn_positive_fraction, # Box parameters box_roi_pool, box_head, box_predictor, box_score_thresh, box_nms_thresh, box_detections_per_img, box_fg_iou_thresh, box_bg_iou_thresh, box_batch_size_per_image, box_positive_fraction, bbox_reg_weights) self.roi_heads.mask_roi_pool = mask_roi_pool self.roi_heads.mask_head = mask_head self.roi_heads.mask_predictor = mask_predictor self.roi_heads.mask_discretization_size = mask_discretization_size class KeypointRCNN(FasterRCNN): def __init__(self, backbone, num_classes=None, # transform parameters min_size=800, max_size=1333, image_mean=None, image_std=None, # RPN parameters rpn_anchor_generator=None, rpn_head=None, rpn_pre_nms_top_n_train=2000, rpn_pre_nms_top_n_test=1000, rpn_post_nms_top_n_train=2000, rpn_post_nms_top_n_test=1000, rpn_nms_thresh=0.7, rpn_fg_iou_thresh=0.7, rpn_bg_iou_thresh=0.3, rpn_batch_size_per_image=256, rpn_positive_fraction=0.5, # Box parameters box_roi_pool=None, box_head=None, box_predictor=None, box_score_thresh=0.05, box_nms_thresh=0.5, box_detections_per_img=100, box_fg_iou_thresh=0.5, box_bg_iou_thresh=0.5, box_batch_size_per_image=512, box_positive_fraction=0.25, bbox_reg_weights=None, # keypoint parameters keypoint_roi_pool=None, keypoint_head=None, keypoint_predictor=None, keypoint_discretization_size=56, num_keypoints=17): assert isinstance(keypoint_roi_pool, (MultiScaleRoIAlign, type(None))) if num_classes is not None: if keypoint_predictor is not None: raise ValueError("num_classes should be None when keypoint_predictor is specified") out_channels = backbone.out_channels if keypoint_roi_pool is None: keypoint_roi_pool = MultiScaleRoIAlign( featmap_names=[0, 1, 2, 3], output_size=14, sampling_ratio=2) if keypoint_head is None: keypoint_layers = tuple(512 for _ in range(8)) keypoint_head = KeypointRCNNHeads(out_channels, keypoint_layers) if keypoint_predictor is None: keypoint_dim_reduced = 512 # == keypoint_layers[-1] keypoint_predictor = KeypointRCNNPredictor(keypoint_dim_reduced, num_keypoints) super(KeypointRCNN, self).__init__( backbone, num_classes, # transform parameters min_size, max_size, image_mean, image_std, # RPN-specific parameters rpn_anchor_generator, rpn_head, rpn_pre_nms_top_n_train, rpn_pre_nms_top_n_test, rpn_post_nms_top_n_train, rpn_post_nms_top_n_test, rpn_nms_thresh, rpn_fg_iou_thresh, rpn_bg_iou_thresh, rpn_batch_size_per_image, rpn_positive_fraction, # Box parameters box_roi_pool, box_head, box_predictor, box_score_thresh, box_nms_thresh, box_detections_per_img, box_fg_iou_thresh, box_bg_iou_thresh, box_batch_size_per_image, box_positive_fraction, bbox_reg_weights) self.roi_heads.keypoint_roi_pool = keypoint_roi_pool self.roi_heads.keypoint_head = keypoint_head self.roi_heads.keypoint_predictor = keypoint_predictor self.roi_heads.keypoint_discretization_size = keypoint_discretization_size class TwoMLPHead(nn.Module): """ Heads for FPN for classification """ def __init__(self, in_channels, representation_size): super(TwoMLPHead, self).__init__() self.fc6 = nn.Linear(in_channels, representation_size) self.fc7 = nn.Linear(representation_size, representation_size) def forward(self, x): x = x.flatten(start_dim=1) x = F.relu(self.fc6(x)) x = F.relu(self.fc7(x)) return x class FastRCNNPredictor(nn.Module): def __init__(self, in_channels, num_classes): super(FastRCNNPredictor, self).__init__() self.cls_score = nn.Linear(in_channels, num_classes) self.bbox_pred = nn.Linear(in_channels, num_classes * 4) def forward(self, x): if x.ndimension() == 4: assert list(x.shape[2:]) == [1, 1] x = x.flatten(start_dim=1) scores = self.cls_score(x) bbox_deltas = self.bbox_pred(x) return scores, bbox_deltas class MaskRCNNHeads(nn.Sequential): def __init__(self, in_channels, layers, dilation): """ Arguments: num_classes (int): number of output classes input_size (int): number of channels of the input once it's flattened representation_size (int): size of the intermediate representation """ d = OrderedDict() next_feature = in_channels for layer_idx, layer_features in enumerate(layers, 1): d["mask_fcn{}".format(layer_idx)] = misc_nn_ops.Conv2d( next_feature, layer_features, kernel_size=3, stride=1, padding=dilation, dilation=dilation) d["relu{}".format(layer_idx)] = nn.ReLU(inplace=True) next_feature = layer_features super(MaskRCNNHeads, self).__init__(d) for name, param in self.named_parameters(): if "weight" in name: nn.init.kaiming_normal_(param, mode="fan_out", nonlinearity="relu") # elif "bias" in name: # nn.init.constant_(param, 0) class MaskRCNNC4Predictor(nn.Sequential): def __init__(self, in_channels, dim_reduced, num_classes): super(MaskRCNNC4Predictor, self).__init__(OrderedDict([ ("conv5_mask", misc_nn_ops.ConvTranspose2d(in_channels, dim_reduced, 2, 2, 0)), ("relu", nn.ReLU(inplace=True)), ("mask_fcn_logits", misc_nn_ops.Conv2d(dim_reduced, num_classes, 1, 1, 0)), ])) for name, param in self.named_parameters(): if "weight" in name: nn.init.kaiming_normal_(param, mode="fan_out", nonlinearity="relu") # elif "bias" in name: # nn.init.constant_(param, 0) class KeypointRCNNHeads(nn.Sequential): def __init__(self, in_channels, layers): d = [] next_feature = in_channels for l in layers: d.append(misc_nn_ops.Conv2d(next_feature, l, 3, stride=1, padding=1)) d.append(nn.ReLU(inplace=True)) next_feature = l super(KeypointRCNNHeads, self).__init__(*d) for m in self.children(): if isinstance(m, misc_nn_ops.Conv2d): nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu") nn.init.constant_(m.bias, 0) class KeypointRCNNPredictor(nn.Module): def __init__(self, in_channels, num_keypoints): super(KeypointRCNNPredictor, self).__init__() input_features = in_channels deconv_kernel = 4 self.kps_score_lowres = misc_nn_ops.ConvTranspose2d( input_features, num_keypoints, deconv_kernel, stride=2, padding=deconv_kernel // 2 - 1, ) nn.init.kaiming_normal_( self.kps_score_lowres.weight, mode="fan_out", nonlinearity="relu" ) nn.init.constant_(self.kps_score_lowres.bias, 0) self.up_scale = 2 self.out_channels = num_keypoints def forward(self, x): x = self.kps_score_lowres(x) x = misc_nn_ops.interpolate( x, scale_factor=self.up_scale, mode="bilinear", align_corners=False ) return x def _resnet_fpn_backbone(backbone_name, pretrained): from .. import resnet backbone = resnet.__dict__[backbone_name]( pretrained=pretrained, norm_layer=misc_nn_ops.FrozenBatchNorm2d) # freeze layers for name, parameter in backbone.named_parameters(): if 'layer2' not in name and 'layer3' not in name and 'layer4' not in name: parameter.requires_grad_(False) return_layers = {'layer1': 0, 'layer2': 1, 'layer3': 2, 'layer4': 3} in_channels_stage2 = 256 in_channels_list = [ in_channels_stage2, in_channels_stage2 * 2, in_channels_stage2 * 4, in_channels_stage2 * 8, ] out_channels = 256 return BackboneWithFPN(backbone, return_layers, in_channels_list, out_channels) def fasterrcnn_resnet50_fpn(pretrained=False, num_classes=81, pretrained_backbone=True, **kwargs): backbone = _resnet_fpn_backbone('resnet50', pretrained_backbone) model = FasterRCNN(backbone, num_classes, **kwargs) if pretrained: pass return model def maskrcnn_resnet50_fpn(pretrained=False, num_classes=81, pretrained_backbone=True, **kwargs): backbone = _resnet_fpn_backbone('resnet50', pretrained_backbone) model = MaskRCNN(backbone, num_classes, **kwargs) if pretrained: pass return model def keypointrcnn_resnet50_fpn(pretrained=False, num_classes=2, num_keypoints=17, pretrained_backbone=True, **kwargs): backbone = _resnet_fpn_backbone('resnet50', pretrained_backbone) model = KeypointRCNN(backbone, num_classes, num_keypoints=num_keypoints, **kwargs) if pretrained: pass return model