import warnings from torch import nn from torchvision.ops.feature_pyramid_network import FeaturePyramidNetwork, LastLevelMaxPool from torchvision.ops import misc as misc_nn_ops from .._utils import IntermediateLayerGetter from .. import mobilenet from .. import resnet class BackboneWithFPN(nn.Module): """ Adds a FPN on top of a model. Internally, it uses torchvision.models._utils.IntermediateLayerGetter to extract a submodel that returns the feature maps specified in return_layers. The same limitations of IntermediatLayerGetter apply here. Args: backbone (nn.Module) return_layers (Dict[name, new_name]): a dict containing the names of the modules for which the activations will be returned as the key of the dict, and the value of the dict is the name of the returned activation (which the user can specify). in_channels_list (List[int]): number of channels for each feature map that is returned, in the order they are present in the OrderedDict out_channels (int): number of channels in the FPN. Attributes: out_channels (int): the number of channels in the FPN """ def __init__(self, backbone, return_layers, in_channels_list, out_channels, extra_blocks=None): super(BackboneWithFPN, self).__init__() if extra_blocks is None: extra_blocks = LastLevelMaxPool() self.body = IntermediateLayerGetter(backbone, return_layers=return_layers) self.fpn = FeaturePyramidNetwork( in_channels_list=in_channels_list, out_channels=out_channels, extra_blocks=extra_blocks, ) self.out_channels = out_channels def forward(self, x): x = self.body(x) x = self.fpn(x) return x def resnet_fpn_backbone( backbone_name, pretrained, norm_layer=misc_nn_ops.FrozenBatchNorm2d, trainable_layers=3, returned_layers=None, extra_blocks=None ): """ Constructs a specified ResNet backbone with FPN on top. Freezes the specified number of layers in the backbone. Examples:: >>> from torchvision.models.detection.backbone_utils import resnet_fpn_backbone >>> backbone = resnet_fpn_backbone('resnet50', pretrained=True, trainable_layers=3) >>> # get some dummy image >>> x = torch.rand(1,3,64,64) >>> # compute the output >>> output = backbone(x) >>> print([(k, v.shape) for k, v in output.items()]) >>> # returns >>> [('0', torch.Size([1, 256, 16, 16])), >>> ('1', torch.Size([1, 256, 8, 8])), >>> ('2', torch.Size([1, 256, 4, 4])), >>> ('3', torch.Size([1, 256, 2, 2])), >>> ('pool', torch.Size([1, 256, 1, 1]))] Args: backbone_name (string): resnet architecture. Possible values are 'ResNet', 'resnet18', 'resnet34', 'resnet50', 'resnet101', 'resnet152', 'resnext50_32x4d', 'resnext101_32x8d', 'wide_resnet50_2', 'wide_resnet101_2' norm_layer (torchvision.ops): it is recommended to use the default value. For details visit: (https://github.com/facebookresearch/maskrcnn-benchmark/issues/267) pretrained (bool): If True, returns a model with backbone pre-trained on Imagenet trainable_layers (int): number of trainable (not frozen) resnet layers starting from final block. Valid values are between 0 and 5, with 5 meaning all backbone layers are trainable. """ backbone = resnet.__dict__[backbone_name]( pretrained=pretrained, norm_layer=norm_layer) # select layers that wont be frozen assert 0 <= trainable_layers <= 5 layers_to_train = ['layer4', 'layer3', 'layer2', 'layer1', 'conv1'][:trainable_layers] # freeze layers only if pretrained backbone is used for name, parameter in backbone.named_parameters(): if all([not name.startswith(layer) for layer in layers_to_train]): parameter.requires_grad_(False) if extra_blocks is None: extra_blocks = LastLevelMaxPool() if returned_layers is None: returned_layers = [1, 2, 3, 4] assert min(returned_layers) > 0 and max(returned_layers) < 5 return_layers = {f'layer{k}': str(v) for v, k in enumerate(returned_layers)} in_channels_stage2 = backbone.inplanes // 8 in_channels_list = [in_channels_stage2 * 2 ** (i - 1) for i in returned_layers] out_channels = 256 return BackboneWithFPN(backbone, return_layers, in_channels_list, out_channels, extra_blocks=extra_blocks) def _validate_trainable_layers(pretrained, trainable_backbone_layers, max_value, default_value): # dont freeze any layers if pretrained model or backbone is not used if not pretrained: if trainable_backbone_layers is not None: warnings.warn( "Changing trainable_backbone_layers has not effect if " "neither pretrained nor pretrained_backbone have been set to True, " "falling back to trainable_backbone_layers={} so that all layers are trainable".format(max_value)) trainable_backbone_layers = max_value # by default freeze first blocks if trainable_backbone_layers is None: trainable_backbone_layers = default_value assert 0 <= trainable_backbone_layers <= max_value return trainable_backbone_layers def mobilenet_backbone( backbone_name, pretrained, fpn, norm_layer=misc_nn_ops.FrozenBatchNorm2d, trainable_layers=2, returned_layers=None, extra_blocks=None ): backbone = mobilenet.__dict__[backbone_name](pretrained=pretrained, norm_layer=norm_layer).features # Gather the indeces of blocks which are strided. These are the locations of C1, ..., Cn-1 blocks. # The first and last blocks are always included because they are the C0 (conv1) and Cn. stage_indices = [0] + [i for i, b in enumerate(backbone) if getattr(b, "is_strided", False)] + [len(backbone) - 1] num_stages = len(stage_indices) # find the index of the layer from which we wont freeze assert 0 <= trainable_layers <= num_stages freeze_before = num_stages if trainable_layers == 0 else stage_indices[num_stages - trainable_layers] # freeze layers only if pretrained backbone is used for b in backbone[:freeze_before]: for parameter in b.parameters(): parameter.requires_grad_(False) out_channels = 256 if fpn: if extra_blocks is None: extra_blocks = LastLevelMaxPool() if returned_layers is None: returned_layers = [num_stages - 2, num_stages - 1] assert min(returned_layers) >= 0 and max(returned_layers) < num_stages return_layers = {f'{stage_indices[k]}': str(v) for v, k in enumerate(returned_layers)} in_channels_list = [backbone[stage_indices[i]].out_channels for i in returned_layers] return BackboneWithFPN(backbone, return_layers, in_channels_list, out_channels, extra_blocks=extra_blocks) else: m = nn.Sequential( backbone, # depthwise linear combination of channels to reduce their size nn.Conv2d(backbone[-1].out_channels, out_channels, 1), ) m.out_channels = out_channels return m