# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import paddle import paddle.nn as nn from paddle import ParamAttr import paddle.nn.functional as F from paddle.nn.initializer import Normal, Constant from paddle.regularizer import L2Decay from ppdet.core.workspace import register, serializable from . import ops from .initializer import xavier_uniform_, constant_ from paddle.vision.ops import DeformConv2D def _to_list(l): if isinstance(l, (list, tuple)): return list(l) return [l] class DeformableConvV2(nn.Layer): def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, weight_attr=None, bias_attr=None, lr_scale=1, regularizer=None, skip_quant=False, dcn_bias_regularizer=L2Decay(0.), dcn_bias_lr_scale=2.): super(DeformableConvV2, self).__init__() self.offset_channel = 2 * kernel_size**2 self.mask_channel = kernel_size**2 if lr_scale == 1 and regularizer is None: offset_bias_attr = ParamAttr(initializer=Constant(0.)) else: offset_bias_attr = ParamAttr( initializer=Constant(0.), learning_rate=lr_scale, regularizer=regularizer) self.conv_offset = nn.Conv2D( in_channels, 3 * kernel_size**2, kernel_size, stride=stride, padding=(kernel_size - 1) // 2, weight_attr=ParamAttr(initializer=Constant(0.0)), bias_attr=offset_bias_attr) if skip_quant: self.conv_offset.skip_quant = True if bias_attr: # in FCOS-DCN head, specifically need learning_rate and regularizer dcn_bias_attr = ParamAttr( initializer=Constant(value=0), regularizer=dcn_bias_regularizer, learning_rate=dcn_bias_lr_scale) else: # in ResNet backbone, do not need bias dcn_bias_attr = False self.conv_dcn = DeformConv2D( in_channels, out_channels, kernel_size, stride=stride, padding=(kernel_size - 1) // 2 * dilation, dilation=dilation, groups=groups, weight_attr=weight_attr, bias_attr=dcn_bias_attr) def forward(self, x): offset_mask = self.conv_offset(x) offset, mask = paddle.split( offset_mask, num_or_sections=[self.offset_channel, self.mask_channel], axis=1) mask = F.sigmoid(mask) y = self.conv_dcn(x, offset, mask=mask) return y class ConvNormLayer(nn.Layer): def __init__(self, ch_in, ch_out, filter_size, stride, groups=1, norm_type='bn', norm_decay=0., norm_groups=32, use_dcn=False, bias_on=False, lr_scale=1., freeze_norm=False, initializer=Normal( mean=0., std=0.01), skip_quant=False, dcn_lr_scale=2., dcn_regularizer=L2Decay(0.)): super(ConvNormLayer, self).__init__() assert norm_type in ['bn', 'sync_bn', 'gn', None] if bias_on: bias_attr = ParamAttr( initializer=Constant(value=0.), learning_rate=lr_scale) else: bias_attr = False if not use_dcn: self.conv = nn.Conv2D( in_channels=ch_in, out_channels=ch_out, kernel_size=filter_size, stride=stride, padding=(filter_size - 1) // 2, groups=groups, weight_attr=ParamAttr( initializer=initializer, learning_rate=1.), bias_attr=bias_attr) if skip_quant: self.conv.skip_quant = True else: # in FCOS-DCN head, specifically need learning_rate and regularizer self.conv = DeformableConvV2( in_channels=ch_in, out_channels=ch_out, kernel_size=filter_size, stride=stride, padding=(filter_size - 1) // 2, groups=groups, weight_attr=ParamAttr( initializer=initializer, learning_rate=1.), bias_attr=True, lr_scale=dcn_lr_scale, regularizer=dcn_regularizer, dcn_bias_regularizer=dcn_regularizer, dcn_bias_lr_scale=dcn_lr_scale, skip_quant=skip_quant) norm_lr = 0. if freeze_norm else 1. param_attr = ParamAttr( learning_rate=norm_lr, regularizer=L2Decay(norm_decay) if norm_decay is not None else None) bias_attr = ParamAttr( learning_rate=norm_lr, regularizer=L2Decay(norm_decay) if norm_decay is not None else None) if norm_type in ['bn', 'sync_bn']: self.norm = nn.BatchNorm2D( ch_out, weight_attr=param_attr, bias_attr=bias_attr) elif norm_type == 'gn': self.norm = nn.GroupNorm( num_groups=norm_groups, num_channels=ch_out, weight_attr=param_attr, bias_attr=bias_attr) else: self.norm = None def forward(self, inputs): out = self.conv(inputs) if self.norm is not None: out = self.norm(out) return out class DropBlock(nn.Layer): def __init__(self, block_size, keep_prob, name=None, data_format='NCHW'): """ DropBlock layer, see https://arxiv.org/abs/1810.12890 Args: block_size (int): block size keep_prob (int): keep probability name (str): layer name data_format (str): data format, NCHW or NHWC """ super(DropBlock, self).__init__() self.block_size = block_size self.keep_prob = keep_prob self.name = name self.data_format = data_format def forward(self, x): if not self.training or self.keep_prob == 1: return x else: gamma = (1. - self.keep_prob) / (self.block_size**2) if self.data_format == 'NCHW': shape = x.shape[2:] else: shape = x.shape[1:3] for s in shape: gamma *= s / (s - self.block_size + 1) matrix = paddle.cast(paddle.rand(x.shape) < gamma, x.dtype) mask_inv = F.max_pool2d( matrix, self.block_size, stride=1, padding=self.block_size // 2, data_format=self.data_format) mask = 1. - mask_inv y = x * mask * (mask.numel() / mask.sum()) return y @register @serializable class MultiClassNMS(object): def __init__(self, score_threshold=.05, nms_top_k=-1, keep_top_k=100, nms_threshold=.5, normalized=True, nms_eta=1.0, return_index=False, return_rois_num=True, trt=False): super(MultiClassNMS, self).__init__() self.score_threshold = score_threshold self.nms_top_k = nms_top_k self.keep_top_k = keep_top_k self.nms_threshold = nms_threshold self.normalized = normalized self.nms_eta = nms_eta self.return_index = return_index self.return_rois_num = return_rois_num self.trt = trt def __call__(self, bboxes, score, background_label=-1): """ bboxes (Tensor|List[Tensor]): 1. (Tensor) Predicted bboxes with shape [N, M, 4], N is the batch size and M is the number of bboxes 2. (List[Tensor]) bboxes and bbox_num, bboxes have shape of [M, C, 4], C is the class number and bbox_num means the number of bboxes of each batch with shape [N,] score (Tensor): Predicted scores with shape [N, C, M] or [M, C] background_label (int): Ignore the background label; For example, RCNN is num_classes and YOLO is -1. """ kwargs = self.__dict__.copy() if isinstance(bboxes, tuple): bboxes, bbox_num = bboxes kwargs.update({'rois_num': bbox_num}) if background_label > -1: kwargs.update({'background_label': background_label}) kwargs.pop('trt') # TODO(wangxinxin08): paddle version should be develop or 2.3 and above to run nms on tensorrt if self.trt and (int(paddle.version.major) == 0 or (int(paddle.version.major) >= 2 and int(paddle.version.minor) >= 3)): # TODO(wangxinxin08): tricky switch to run nms on tensorrt kwargs.update({'nms_eta': 1.1}) bbox, bbox_num, _ = ops.multiclass_nms(bboxes, score, **kwargs) bbox = bbox.reshape([1, -1, 6]) idx = paddle.nonzero(bbox[..., 0] != -1) bbox = paddle.gather_nd(bbox, idx) return bbox, bbox_num, None else: return ops.multiclass_nms(bboxes, score, **kwargs) @register @serializable class MatrixNMS(object): __append_doc__ = True def __init__(self, score_threshold=.05, post_threshold=.05, nms_top_k=-1, keep_top_k=100, use_gaussian=False, gaussian_sigma=2., normalized=False, background_label=0): super(MatrixNMS, self).__init__() self.score_threshold = score_threshold self.post_threshold = post_threshold self.nms_top_k = nms_top_k self.keep_top_k = keep_top_k self.normalized = normalized self.use_gaussian = use_gaussian self.gaussian_sigma = gaussian_sigma self.background_label = background_label def __call__(self, bbox, score, *args): return ops.matrix_nms( bboxes=bbox, scores=score, score_threshold=self.score_threshold, post_threshold=self.post_threshold, nms_top_k=self.nms_top_k, keep_top_k=self.keep_top_k, use_gaussian=self.use_gaussian, gaussian_sigma=self.gaussian_sigma, background_label=self.background_label, normalized=self.normalized) @register @serializable class YOLOBox(object): __shared__ = ['num_classes'] def __init__(self, num_classes=80, conf_thresh=0.005, downsample_ratio=32, clip_bbox=True, scale_x_y=1.): self.num_classes = num_classes self.conf_thresh = conf_thresh self.downsample_ratio = downsample_ratio self.clip_bbox = clip_bbox self.scale_x_y = scale_x_y def __call__(self, yolo_head_out, anchors, im_shape, scale_factor, var_weight=None): boxes_list = [] scores_list = [] origin_shape = im_shape / scale_factor origin_shape = paddle.cast(origin_shape, 'int32') for i, head_out in enumerate(yolo_head_out): boxes, scores = paddle.vision.ops.yolo_box( head_out, origin_shape, anchors[i], self.num_classes, self.conf_thresh, self.downsample_ratio // 2**i, self.clip_bbox, scale_x_y=self.scale_x_y) boxes_list.append(boxes) scores_list.append(paddle.transpose(scores, perm=[0, 2, 1])) yolo_boxes = paddle.concat(boxes_list, axis=1) yolo_scores = paddle.concat(scores_list, axis=2) return yolo_boxes, yolo_scores def Conv2d(in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True, weight_init=Normal(std=0.001), bias_init=Constant(0.)): weight_attr = paddle.framework.ParamAttr(initializer=weight_init) if bias: bias_attr = paddle.framework.ParamAttr(initializer=bias_init) else: bias_attr = False conv = nn.Conv2D( in_channels, out_channels, kernel_size, stride, padding, dilation, groups, weight_attr=weight_attr, bias_attr=bias_attr) return conv def ConvTranspose2d(in_channels, out_channels, kernel_size, stride=1, padding=0, output_padding=0, groups=1, bias=True, dilation=1, weight_init=Normal(std=0.001), bias_init=Constant(0.)): weight_attr = paddle.framework.ParamAttr(initializer=weight_init) if bias: bias_attr = paddle.framework.ParamAttr(initializer=bias_init) else: bias_attr = False conv = nn.Conv2DTranspose( in_channels, out_channels, kernel_size, stride, padding, output_padding, dilation, groups, weight_attr=weight_attr, bias_attr=bias_attr) return conv def BatchNorm2d(num_features, eps=1e-05, momentum=0.9, affine=True): if not affine: weight_attr = False bias_attr = False else: weight_attr = None bias_attr = None batchnorm = nn.BatchNorm2D( num_features, momentum, eps, weight_attr=weight_attr, bias_attr=bias_attr) return batchnorm def ReLU(): return nn.ReLU() def Upsample(scale_factor=None, mode='nearest', align_corners=False): return nn.Upsample(None, scale_factor, mode, align_corners) def MaxPool(kernel_size, stride, padding, ceil_mode=False): return nn.MaxPool2D(kernel_size, stride, padding, ceil_mode=ceil_mode) class Concat(nn.Layer): def __init__(self, dim=0): super(Concat, self).__init__() self.dim = dim def forward(self, inputs): return paddle.concat(inputs, axis=self.dim) def extra_repr(self): return 'dim={}'.format(self.dim) def _convert_attention_mask(attn_mask, dtype): """ Convert the attention mask to the target dtype we expect. Parameters: attn_mask (Tensor, optional): A tensor used in multi-head attention to prevents attention to some unwanted positions, usually the paddings or the subsequent positions. It is a tensor with shape broadcasted to `[batch_size, n_head, sequence_length, sequence_length]`. When the data type is bool, the unwanted positions have `False` values and the others have `True` values. When the data type is int, the unwanted positions have 0 values and the others have 1 values. When the data type is float, the unwanted positions have `-INF` values and the others have 0 values. It can be None when nothing wanted or needed to be prevented attention to. Default None. dtype (VarType): The target type of `attn_mask` we expect. Returns: Tensor: A Tensor with shape same as input `attn_mask`, with data type `dtype`. """ return nn.layer.transformer._convert_attention_mask(attn_mask, dtype) @register class MultiHeadAttention(nn.Layer): """ Attention mapps queries and a set of key-value pairs to outputs, and Multi-Head Attention performs multiple parallel attention to jointly attending to information from different representation subspaces. Please refer to `Attention Is All You Need `_ for more details. Parameters: embed_dim (int): The expected feature size in the input and output. num_heads (int): The number of heads in multi-head attention. dropout (float, optional): The dropout probability used on attention weights to drop some attention targets. 0 for no dropout. Default 0 kdim (int, optional): The feature size in key. If None, assumed equal to `embed_dim`. Default None. vdim (int, optional): The feature size in value. If None, assumed equal to `embed_dim`. Default None. need_weights (bool, optional): Indicate whether to return the attention weights. Default False. Examples: .. code-block:: python import paddle # encoder input: [batch_size, sequence_length, d_model] query = paddle.rand((2, 4, 128)) # self attention mask: [batch_size, num_heads, query_len, query_len] attn_mask = paddle.rand((2, 2, 4, 4)) multi_head_attn = paddle.nn.MultiHeadAttention(128, 2) output = multi_head_attn(query, None, None, attn_mask=attn_mask) # [2, 4, 128] """ def __init__(self, embed_dim, num_heads, dropout=0., kdim=None, vdim=None, need_weights=False): super(MultiHeadAttention, self).__init__() self.embed_dim = embed_dim self.kdim = kdim if kdim is not None else embed_dim self.vdim = vdim if vdim is not None else embed_dim self._qkv_same_embed_dim = self.kdim == embed_dim and self.vdim == embed_dim self.num_heads = num_heads self.dropout = dropout self.need_weights = need_weights self.head_dim = embed_dim // num_heads assert self.head_dim * num_heads == self.embed_dim, "embed_dim must be divisible by num_heads" if self._qkv_same_embed_dim: self.in_proj_weight = self.create_parameter( shape=[embed_dim, 3 * embed_dim], attr=None, dtype=self._dtype, is_bias=False) self.in_proj_bias = self.create_parameter( shape=[3 * embed_dim], attr=None, dtype=self._dtype, is_bias=True) else: self.q_proj = nn.Linear(embed_dim, embed_dim) self.k_proj = nn.Linear(self.kdim, embed_dim) self.v_proj = nn.Linear(self.vdim, embed_dim) self.out_proj = nn.Linear(embed_dim, embed_dim) self._type_list = ('q_proj', 'k_proj', 'v_proj') self._reset_parameters() def _reset_parameters(self): for p in self.parameters(): if p.dim() > 1: xavier_uniform_(p) else: constant_(p) def compute_qkv(self, tensor, index): if self._qkv_same_embed_dim: tensor = F.linear( x=tensor, weight=self.in_proj_weight[:, index * self.embed_dim:(index + 1) * self.embed_dim], bias=self.in_proj_bias[index * self.embed_dim:(index + 1) * self.embed_dim] if self.in_proj_bias is not None else None) else: tensor = getattr(self, self._type_list[index])(tensor) tensor = tensor.reshape( [0, 0, self.num_heads, self.head_dim]).transpose([0, 2, 1, 3]) return tensor def forward(self, query, key=None, value=None, attn_mask=None): r""" Applies multi-head attention to map queries and a set of key-value pairs to outputs. Parameters: query (Tensor): The queries for multi-head attention. It is a tensor with shape `[batch_size, query_length, embed_dim]`. The data type should be float32 or float64. key (Tensor, optional): The keys for multi-head attention. It is a tensor with shape `[batch_size, key_length, kdim]`. The data type should be float32 or float64. If None, use `query` as `key`. Default None. value (Tensor, optional): The values for multi-head attention. It is a tensor with shape `[batch_size, value_length, vdim]`. The data type should be float32 or float64. If None, use `query` as `value`. Default None. attn_mask (Tensor, optional): A tensor used in multi-head attention to prevents attention to some unwanted positions, usually the paddings or the subsequent positions. It is a tensor with shape broadcasted to `[batch_size, n_head, sequence_length, sequence_length]`. When the data type is bool, the unwanted positions have `False` values and the others have `True` values. When the data type is int, the unwanted positions have 0 values and the others have 1 values. When the data type is float, the unwanted positions have `-INF` values and the others have 0 values. It can be None when nothing wanted or needed to be prevented attention to. Default None. Returns: Tensor|tuple: It is a tensor that has the same shape and data type \ as `query`, representing attention output. Or a tuple if \ `need_weights` is True or `cache` is not None. If `need_weights` \ is True, except for attention output, the tuple also includes \ the attention weights tensor shaped `[batch_size, num_heads, query_length, key_length]`. \ If `cache` is not None, the tuple then includes the new cache \ having the same type as `cache`, and if it is `StaticCache`, it \ is same as the input `cache`, if it is `Cache`, the new cache \ reserves tensors concatanating raw tensors with intermediate \ results of current query. """ key = query if key is None else key value = query if value is None else value # compute q ,k ,v q, k, v = (self.compute_qkv(t, i) for i, t in enumerate([query, key, value])) # scale dot product attention product = paddle.matmul(x=q, y=k, transpose_y=True) scaling = float(self.head_dim)**-0.5 product = product * scaling if attn_mask is not None: # Support bool or int mask attn_mask = _convert_attention_mask(attn_mask, product.dtype) product = product + attn_mask weights = F.softmax(product) if self.dropout: weights = F.dropout( weights, self.dropout, training=self.training, mode="upscale_in_train") out = paddle.matmul(weights, v) # combine heads out = paddle.transpose(out, perm=[0, 2, 1, 3]) out = paddle.reshape(x=out, shape=[0, 0, out.shape[2] * out.shape[3]]) # project to output out = self.out_proj(out) outs = [out] if self.need_weights: outs.append(weights) return out if len(outs) == 1 else tuple(outs)