# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ this code is base on https://github.com/hikvision-research/opera/blob/main/opera/models/utils/transformer.py """ from __future__ import absolute_import from __future__ import division from __future__ import print_function import math import numpy as np import paddle import paddle.nn as nn import paddle.nn.functional as F from paddle import ParamAttr from ppdet.core.workspace import register from ..layers import MultiHeadAttention, _convert_attention_mask from .utils import _get_clones from ..initializer import linear_init_, normal_, constant_, xavier_uniform_ __all__ = [ 'PETRTransformer', 'MultiScaleDeformablePoseAttention', 'PETR_TransformerDecoderLayer', 'PETR_TransformerDecoder', 'PETR_DeformableDetrTransformerDecoder', 'PETR_DeformableTransformerDecoder', 'TransformerEncoderLayer', 'TransformerEncoder', 'MSDeformableAttention' ] def masked_fill(x, mask, value): y = paddle.full(x.shape, value, x.dtype) return paddle.where(mask, y, x) def inverse_sigmoid(x, eps=1e-5): """Inverse function of sigmoid. Args: x (Tensor): The tensor to do the inverse. eps (float): EPS avoid numerical overflow. Defaults 1e-5. Returns: Tensor: The x has passed the inverse function of sigmoid, has same shape with input. """ x = x.clip(min=0, max=1) x1 = x.clip(min=eps) x2 = (1 - x).clip(min=eps) return paddle.log(x1 / x2) @register class TransformerEncoderLayer(nn.Layer): __inject__ = ['attn'] def __init__(self, d_model, attn=None, nhead=8, dim_feedforward=2048, dropout=0.1, activation="relu", attn_dropout=None, act_dropout=None, normalize_before=False): super(TransformerEncoderLayer, self).__init__() attn_dropout = dropout if attn_dropout is None else attn_dropout act_dropout = dropout if act_dropout is None else act_dropout self.normalize_before = normalize_before self.embed_dims = d_model if attn is None: self.self_attn = MultiHeadAttention(d_model, nhead, attn_dropout) else: self.self_attn = attn # Implementation of Feedforward model self.linear1 = nn.Linear(d_model, dim_feedforward) self.dropout = nn.Dropout(act_dropout, mode="upscale_in_train") self.linear2 = nn.Linear(dim_feedforward, d_model) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout, mode="upscale_in_train") self.dropout2 = nn.Dropout(dropout, mode="upscale_in_train") self.activation = getattr(F, activation) self._reset_parameters() def _reset_parameters(self): linear_init_(self.linear1) linear_init_(self.linear2) @staticmethod def with_pos_embed(tensor, pos_embed): return tensor if pos_embed is None else tensor + pos_embed def forward(self, src, src_mask=None, pos_embed=None, **kwargs): residual = src if self.normalize_before: src = self.norm1(src) q = k = self.with_pos_embed(src, pos_embed) src = self.self_attn(q, k, value=src, attn_mask=src_mask, **kwargs) src = residual + self.dropout1(src) if not self.normalize_before: src = self.norm1(src) residual = src if self.normalize_before: src = self.norm2(src) src = self.linear2(self.dropout(self.activation(self.linear1(src)))) src = residual + self.dropout2(src) if not self.normalize_before: src = self.norm2(src) return src @register class TransformerEncoder(nn.Layer): __inject__ = ['encoder_layer'] def __init__(self, encoder_layer, num_layers, norm=None): super(TransformerEncoder, self).__init__() self.layers = _get_clones(encoder_layer, num_layers) self.num_layers = num_layers self.norm = norm self.embed_dims = encoder_layer.embed_dims def forward(self, src, src_mask=None, pos_embed=None, **kwargs): output = src for layer in self.layers: output = layer( output, src_mask=src_mask, pos_embed=pos_embed, **kwargs) if self.norm is not None: output = self.norm(output) return output @register class MSDeformableAttention(nn.Layer): def __init__(self, embed_dim=256, num_heads=8, num_levels=4, num_points=4, lr_mult=0.1): """ Multi-Scale Deformable Attention Module """ super(MSDeformableAttention, self).__init__() self.embed_dim = embed_dim self.num_heads = num_heads self.num_levels = num_levels self.num_points = num_points self.total_points = num_heads * num_levels * num_points self.head_dim = embed_dim // num_heads assert self.head_dim * num_heads == self.embed_dim, "embed_dim must be divisible by num_heads" self.sampling_offsets = nn.Linear( embed_dim, self.total_points * 2, weight_attr=ParamAttr(learning_rate=lr_mult), bias_attr=ParamAttr(learning_rate=lr_mult)) self.attention_weights = nn.Linear(embed_dim, self.total_points) self.value_proj = nn.Linear(embed_dim, embed_dim) self.output_proj = nn.Linear(embed_dim, embed_dim) try: # use cuda op print("use deformable_detr_ops in ms_deformable_attn") from deformable_detr_ops import ms_deformable_attn except: # use paddle func from .utils import deformable_attention_core_func as ms_deformable_attn self.ms_deformable_attn_core = ms_deformable_attn self._reset_parameters() def _reset_parameters(self): # sampling_offsets constant_(self.sampling_offsets.weight) thetas = paddle.arange( self.num_heads, dtype=paddle.float32) * (2.0 * math.pi / self.num_heads) grid_init = paddle.stack([thetas.cos(), thetas.sin()], -1) grid_init = grid_init / grid_init.abs().max(-1, keepdim=True) grid_init = grid_init.reshape([self.num_heads, 1, 1, 2]).tile( [1, self.num_levels, self.num_points, 1]) scaling = paddle.arange( 1, self.num_points + 1, dtype=paddle.float32).reshape([1, 1, -1, 1]) grid_init *= scaling self.sampling_offsets.bias.set_value(grid_init.flatten()) # attention_weights constant_(self.attention_weights.weight) constant_(self.attention_weights.bias) # proj xavier_uniform_(self.value_proj.weight) constant_(self.value_proj.bias) xavier_uniform_(self.output_proj.weight) constant_(self.output_proj.bias) def forward(self, query, key, value, reference_points, value_spatial_shapes, value_level_start_index, attn_mask=None, **kwargs): """ Args: query (Tensor): [bs, query_length, C] reference_points (Tensor): [bs, query_length, n_levels, 2], range in [0, 1], top-left (0,0), bottom-right (1, 1), including padding area value (Tensor): [bs, value_length, C] value_spatial_shapes (Tensor): [n_levels, 2], [(H_0, W_0), (H_1, W_1), ..., (H_{L-1}, W_{L-1})] value_level_start_index (Tensor(int64)): [n_levels], [0, H_0*W_0, H_0*W_0+H_1*W_1, ...] attn_mask (Tensor): [bs, value_length], True for non-padding elements, False for padding elements Returns: output (Tensor): [bs, Length_{query}, C] """ bs, Len_q = query.shape[:2] Len_v = value.shape[1] assert int(value_spatial_shapes.prod(1).sum()) == Len_v value = self.value_proj(value) if attn_mask is not None: attn_mask = attn_mask.astype(value.dtype).unsqueeze(-1) value *= attn_mask value = value.reshape([bs, Len_v, self.num_heads, self.head_dim]) sampling_offsets = self.sampling_offsets(query).reshape( [bs, Len_q, self.num_heads, self.num_levels, self.num_points, 2]) attention_weights = self.attention_weights(query).reshape( [bs, Len_q, self.num_heads, self.num_levels * self.num_points]) attention_weights = F.softmax(attention_weights).reshape( [bs, Len_q, self.num_heads, self.num_levels, self.num_points]) if reference_points.shape[-1] == 2: offset_normalizer = value_spatial_shapes.flip([1]).reshape( [1, 1, 1, self.num_levels, 1, 2]) sampling_locations = reference_points.reshape([ bs, Len_q, 1, self.num_levels, 1, 2 ]) + sampling_offsets / offset_normalizer elif reference_points.shape[-1] == 4: sampling_locations = ( reference_points[:, :, None, :, None, :2] + sampling_offsets / self.num_points * reference_points[:, :, None, :, None, 2:] * 0.5) else: raise ValueError( "Last dim of reference_points must be 2 or 4, but get {} instead.". format(reference_points.shape[-1])) output = self.ms_deformable_attn_core( value, value_spatial_shapes, value_level_start_index, sampling_locations, attention_weights) output = self.output_proj(output) return output @register class MultiScaleDeformablePoseAttention(nn.Layer): """An attention module used in PETR. `End-to-End Multi-Person Pose Estimation with Transformers`. Args: embed_dims (int): The embedding dimension of Attention. Default: 256. num_heads (int): Parallel attention heads. Default: 8. num_levels (int): The number of feature map used in Attention. Default: 4. num_points (int): The number of sampling points for each query in each head. Default: 17. im2col_step (int): The step used in image_to_column. Default: 64. dropout (float): A Dropout layer on `inp_residual`. Default: 0.1. init_cfg (obj:`mmcv.ConfigDict`): The Config for initialization. Default: None. """ def __init__(self, embed_dims=256, num_heads=8, num_levels=4, num_points=17, im2col_step=64, dropout=0.1, norm_cfg=None, init_cfg=None, batch_first=False, lr_mult=0.1): super().__init__() if embed_dims % num_heads != 0: raise ValueError(f'embed_dims must be divisible by num_heads, ' f'but got {embed_dims} and {num_heads}') dim_per_head = embed_dims // num_heads self.norm_cfg = norm_cfg self.init_cfg = init_cfg self.dropout = nn.Dropout(dropout) self.batch_first = batch_first # you'd better set dim_per_head to a power of 2 # which is more efficient in the CUDA implementation def _is_power_of_2(n): if (not isinstance(n, int)) or (n < 0): raise ValueError( 'invalid input for _is_power_of_2: {} (type: {})'.format( n, type(n))) return (n & (n - 1) == 0) and n != 0 if not _is_power_of_2(dim_per_head): warnings.warn("You'd better set embed_dims in " 'MultiScaleDeformAttention to make ' 'the dimension of each attention head a power of 2 ' 'which is more efficient in our CUDA implementation.') self.im2col_step = im2col_step self.embed_dims = embed_dims self.num_levels = num_levels self.num_heads = num_heads self.num_points = num_points self.sampling_offsets = nn.Linear( embed_dims, num_heads * num_levels * num_points * 2, weight_attr=ParamAttr(learning_rate=lr_mult), bias_attr=ParamAttr(learning_rate=lr_mult)) self.attention_weights = nn.Linear(embed_dims, num_heads * num_levels * num_points) self.value_proj = nn.Linear(embed_dims, embed_dims) self.output_proj = nn.Linear(embed_dims, embed_dims) try: # use cuda op from deformable_detr_ops import ms_deformable_attn except: # use paddle func from .utils import deformable_attention_core_func as ms_deformable_attn self.ms_deformable_attn_core = ms_deformable_attn self.init_weights() def init_weights(self): """Default initialization for Parameters of Module.""" constant_(self.sampling_offsets.weight) constant_(self.sampling_offsets.bias) constant_(self.attention_weights.weight) constant_(self.attention_weights.bias) xavier_uniform_(self.value_proj.weight) constant_(self.value_proj.bias) xavier_uniform_(self.output_proj.weight) constant_(self.output_proj.bias) def forward(self, query, key, value, residual=None, attn_mask=None, reference_points=None, value_spatial_shapes=None, value_level_start_index=None, **kwargs): """Forward Function of MultiScaleDeformAttention. Args: query (Tensor): Query of Transformer with shape (num_query, bs, embed_dims). key (Tensor): The key tensor with shape (num_key, bs, embed_dims). value (Tensor): The value tensor with shape (num_key, bs, embed_dims). residual (Tensor): The tensor used for addition, with the same shape as `x`. Default None. If None, `x` will be used. reference_points (Tensor): The normalized reference points with shape (bs, num_query, num_levels, K*2), all elements is range in [0, 1], top-left (0,0), bottom-right (1, 1), including padding area. attn_mask (Tensor): ByteTensor for `query`, with shape [bs, num_key]. value_spatial_shapes (Tensor): Spatial shape of features in different level. With shape (num_levels, 2), last dimension represent (h, w). value_level_start_index (Tensor): The start index of each level. A tensor has shape (num_levels) and can be represented as [0, h_0*w_0, h_0*w_0+h_1*w_1, ...]. Returns: Tensor: forwarded results with shape [num_query, bs, embed_dims]. """ if key is None: key = query if value is None: value = key bs, num_query, _ = query.shape bs, num_key, _ = value.shape assert (value_spatial_shapes[:, 0].numpy() * value_spatial_shapes[:, 1].numpy()).sum() == num_key value = self.value_proj(value) if attn_mask is not None: # value = value.masked_fill(attn_mask[..., None], 0.0) value *= attn_mask.unsqueeze(-1) value = value.reshape([bs, num_key, self.num_heads, -1]) sampling_offsets = self.sampling_offsets(query).reshape([ bs, num_query, self.num_heads, self.num_levels, self.num_points, 2 ]) attention_weights = self.attention_weights(query).reshape( [bs, num_query, self.num_heads, self.num_levels * self.num_points]) attention_weights = F.softmax(attention_weights, axis=-1) attention_weights = attention_weights.reshape( [bs, num_query, self.num_heads, self.num_levels, self.num_points]) if reference_points.shape[-1] == self.num_points * 2: reference_points_reshape = reference_points.reshape( (bs, num_query, self.num_levels, -1, 2)).unsqueeze(2) x1 = reference_points[:, :, :, 0::2].min(axis=-1, keepdim=True) y1 = reference_points[:, :, :, 1::2].min(axis=-1, keepdim=True) x2 = reference_points[:, :, :, 0::2].max(axis=-1, keepdim=True) y2 = reference_points[:, :, :, 1::2].max(axis=-1, keepdim=True) w = paddle.clip(x2 - x1, min=1e-4) h = paddle.clip(y2 - y1, min=1e-4) wh = paddle.concat([w, h], axis=-1)[:, :, None, :, None, :] sampling_locations = reference_points_reshape \ + sampling_offsets * wh * 0.5 else: raise ValueError( f'Last dim of reference_points must be' f' 2K, but get {reference_points.shape[-1]} instead.') output = self.ms_deformable_attn_core( value, value_spatial_shapes, value_level_start_index, sampling_locations, attention_weights) output = self.output_proj(output) return output @register class PETR_TransformerDecoderLayer(nn.Layer): __inject__ = ['self_attn', 'cross_attn'] def __init__(self, d_model, nhead=8, self_attn=None, cross_attn=None, dim_feedforward=2048, dropout=0.1, activation="relu", attn_dropout=None, act_dropout=None, normalize_before=False): super(PETR_TransformerDecoderLayer, self).__init__() attn_dropout = dropout if attn_dropout is None else attn_dropout act_dropout = dropout if act_dropout is None else act_dropout self.normalize_before = normalize_before if self_attn is None: self.self_attn = MultiHeadAttention(d_model, nhead, attn_dropout) else: self.self_attn = self_attn if cross_attn is None: self.cross_attn = MultiHeadAttention(d_model, nhead, attn_dropout) else: self.cross_attn = cross_attn # Implementation of Feedforward model self.linear1 = nn.Linear(d_model, dim_feedforward) self.dropout = nn.Dropout(act_dropout, mode="upscale_in_train") self.linear2 = nn.Linear(dim_feedforward, d_model) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout, mode="upscale_in_train") self.dropout2 = nn.Dropout(dropout, mode="upscale_in_train") self.dropout3 = nn.Dropout(dropout, mode="upscale_in_train") self.activation = getattr(F, activation) self._reset_parameters() def _reset_parameters(self): linear_init_(self.linear1) linear_init_(self.linear2) @staticmethod def with_pos_embed(tensor, pos_embed): return tensor if pos_embed is None else tensor + pos_embed def forward(self, tgt, memory, tgt_mask=None, memory_mask=None, pos_embed=None, query_pos_embed=None, **kwargs): tgt_mask = _convert_attention_mask(tgt_mask, tgt.dtype) residual = tgt if self.normalize_before: tgt = self.norm1(tgt) q = k = self.with_pos_embed(tgt, query_pos_embed) tgt = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask) tgt = residual + self.dropout1(tgt) if not self.normalize_before: tgt = self.norm1(tgt) residual = tgt if self.normalize_before: tgt = self.norm2(tgt) q = self.with_pos_embed(tgt, query_pos_embed) key_tmp = tgt # k = self.with_pos_embed(memory, pos_embed) tgt = self.cross_attn( q, key=key_tmp, value=memory, attn_mask=memory_mask, **kwargs) tgt = residual + self.dropout2(tgt) if not self.normalize_before: tgt = self.norm2(tgt) residual = tgt if self.normalize_before: tgt = self.norm3(tgt) tgt = self.linear2(self.dropout(self.activation(self.linear1(tgt)))) tgt = residual + self.dropout3(tgt) if not self.normalize_before: tgt = self.norm3(tgt) return tgt @register class PETR_TransformerDecoder(nn.Layer): """Implements the decoder in PETR transformer. Args: return_intermediate (bool): Whether to return intermediate outputs. coder_norm_cfg (dict): Config of last normalization layer. Default: `LN`. """ __inject__ = ['decoder_layer'] def __init__(self, decoder_layer, num_layers, norm=None, return_intermediate=False, num_keypoints=17, **kwargs): super(PETR_TransformerDecoder, self).__init__() self.layers = _get_clones(decoder_layer, num_layers) self.num_layers = num_layers self.norm = norm self.return_intermediate = return_intermediate self.num_keypoints = num_keypoints def forward(self, query, *args, reference_points=None, valid_ratios=None, kpt_branches=None, **kwargs): """Forward function for `TransformerDecoder`. Args: query (Tensor): Input query with shape (num_query, bs, embed_dims). reference_points (Tensor): The reference points of offset, has shape (bs, num_query, K*2). valid_ratios (Tensor): The radios of valid points on the feature map, has shape (bs, num_levels, 2). kpt_branches: (obj:`nn.LayerList`): Used for refining the regression results. Only would be passed when `with_box_refine` is True, otherwise would be passed a `None`. Returns: tuple (Tensor): Results with shape [1, num_query, bs, embed_dims] when return_intermediate is `False`, otherwise it has shape [num_layers, num_query, bs, embed_dims] and [num_layers, bs, num_query, K*2]. """ output = query intermediate = [] intermediate_reference_points = [] for lid, layer in enumerate(self.layers): if reference_points.shape[-1] == self.num_keypoints * 2: reference_points_input = \ reference_points[:, :, None] * \ valid_ratios.tile((1, 1, self.num_keypoints))[:, None] else: assert reference_points.shape[-1] == 2 reference_points_input = reference_points[:, :, None] * \ valid_ratios[:, None] output = layer( output, *args, reference_points=reference_points_input, **kwargs) if kpt_branches is not None: tmp = kpt_branches[lid](output) if reference_points.shape[-1] == self.num_keypoints * 2: new_reference_points = tmp + inverse_sigmoid( reference_points) new_reference_points = F.sigmoid(new_reference_points) else: raise NotImplementedError reference_points = new_reference_points.detach() if self.return_intermediate: intermediate.append(output) intermediate_reference_points.append(reference_points) if self.return_intermediate: return paddle.stack(intermediate), paddle.stack( intermediate_reference_points) return output, reference_points @register class PETR_DeformableTransformerDecoder(nn.Layer): __inject__ = ['decoder_layer'] def __init__(self, decoder_layer, num_layers, return_intermediate=False): super(PETR_DeformableTransformerDecoder, self).__init__() self.layers = _get_clones(decoder_layer, num_layers) self.num_layers = num_layers self.return_intermediate = return_intermediate def forward(self, tgt, reference_points, memory, memory_spatial_shapes, memory_mask=None, query_pos_embed=None): output = tgt intermediate = [] for lid, layer in enumerate(self.layers): output = layer(output, reference_points, memory, memory_spatial_shapes, memory_mask, query_pos_embed) if self.return_intermediate: intermediate.append(output) if self.return_intermediate: return paddle.stack(intermediate) return output.unsqueeze(0) @register class PETR_DeformableDetrTransformerDecoder(PETR_DeformableTransformerDecoder): """Implements the decoder in DETR transformer. Args: return_intermediate (bool): Whether to return intermediate outputs. coder_norm_cfg (dict): Config of last normalization layer. Default: `LN`. """ def __init__(self, *args, return_intermediate=False, **kwargs): super(PETR_DeformableDetrTransformerDecoder, self).__init__(*args, **kwargs) self.return_intermediate = return_intermediate def forward(self, query, *args, reference_points=None, valid_ratios=None, reg_branches=None, **kwargs): """Forward function for `TransformerDecoder`. Args: query (Tensor): Input query with shape `(num_query, bs, embed_dims)`. reference_points (Tensor): The reference points of offset. has shape (bs, num_query, 4) when as_two_stage, otherwise has shape ((bs, num_query, 2). valid_ratios (Tensor): The radios of valid points on the feature map, has shape (bs, num_levels, 2) reg_branch: (obj:`nn.LayerList`): Used for refining the regression results. Only would be passed when with_box_refine is True, otherwise would be passed a `None`. Returns: Tensor: Results with shape [1, num_query, bs, embed_dims] when return_intermediate is `False`, otherwise it has shape [num_layers, num_query, bs, embed_dims]. """ output = query intermediate = [] intermediate_reference_points = [] for lid, layer in enumerate(self.layers): if reference_points.shape[-1] == 4: reference_points_input = reference_points[:, :, None] * \ paddle.concat([valid_ratios, valid_ratios], -1)[:, None] else: assert reference_points.shape[-1] == 2 reference_points_input = reference_points[:, :, None] * \ valid_ratios[:, None] output = layer( output, *args, reference_points=reference_points_input, **kwargs) if reg_branches is not None: tmp = reg_branches[lid](output) if reference_points.shape[-1] == 4: new_reference_points = tmp + inverse_sigmoid( reference_points) new_reference_points = F.sigmoid(new_reference_points) else: assert reference_points.shape[-1] == 2 new_reference_points = tmp new_reference_points[..., :2] = tmp[ ..., :2] + inverse_sigmoid(reference_points) new_reference_points = F.sigmoid(new_reference_points) reference_points = new_reference_points.detach() if self.return_intermediate: intermediate.append(output) intermediate_reference_points.append(reference_points) if self.return_intermediate: return paddle.stack(intermediate), paddle.stack( intermediate_reference_points) return output, reference_points @register class PETRTransformer(nn.Layer): """Implements the PETR transformer. Args: as_two_stage (bool): Generate query from encoder features. Default: False. num_feature_levels (int): Number of feature maps from FPN: Default: 4. two_stage_num_proposals (int): Number of proposals when set `as_two_stage` as True. Default: 300. """ __inject__ = ["encoder", "decoder", "hm_encoder", "refine_decoder"] def __init__(self, encoder="", decoder="", hm_encoder="", refine_decoder="", as_two_stage=True, num_feature_levels=4, two_stage_num_proposals=300, num_keypoints=17, **kwargs): super(PETRTransformer, self).__init__(**kwargs) self.as_two_stage = as_two_stage self.num_feature_levels = num_feature_levels self.two_stage_num_proposals = two_stage_num_proposals self.num_keypoints = num_keypoints self.encoder = encoder self.decoder = decoder self.embed_dims = self.encoder.embed_dims self.hm_encoder = hm_encoder self.refine_decoder = refine_decoder self.init_layers() self.init_weights() def init_layers(self): """Initialize layers of the DeformableDetrTransformer.""" #paddle.create_parameter self.level_embeds = paddle.create_parameter( (self.num_feature_levels, self.embed_dims), dtype="float32") if self.as_two_stage: self.enc_output = nn.Linear(self.embed_dims, self.embed_dims) self.enc_output_norm = nn.LayerNorm(self.embed_dims) self.refine_query_embedding = nn.Embedding(self.num_keypoints, self.embed_dims * 2) else: self.reference_points = nn.Linear(self.embed_dims, 2 * self.num_keypoints) def init_weights(self): """Initialize the transformer weights.""" for p in self.parameters(): if p.rank() > 1: xavier_uniform_(p) if hasattr(p, 'bias') and p.bias is not None: constant_(p.bais) for m in self.sublayers(): if isinstance(m, MSDeformableAttention): m._reset_parameters() for m in self.sublayers(): if isinstance(m, MultiScaleDeformablePoseAttention): m.init_weights() if not self.as_two_stage: xavier_uniform_(self.reference_points.weight) constant_(self.reference_points.bias) normal_(self.level_embeds) normal_(self.refine_query_embedding.weight) def gen_encoder_output_proposals(self, memory, memory_padding_mask, spatial_shapes): """Generate proposals from encoded memory. Args: memory (Tensor): The output of encoder, has shape (bs, num_key, embed_dim). num_key is equal the number of points on feature map from all level. memory_padding_mask (Tensor): Padding mask for memory. has shape (bs, num_key). spatial_shapes (Tensor): The shape of all feature maps. has shape (num_level, 2). Returns: tuple: A tuple of feature map and bbox prediction. - output_memory (Tensor): The input of decoder, has shape (bs, num_key, embed_dim). num_key is equal the number of points on feature map from all levels. - output_proposals (Tensor): The normalized proposal after a inverse sigmoid, has shape (bs, num_keys, 4). """ N, S, C = memory.shape proposals = [] _cur = 0 for lvl, (H, W) in enumerate(spatial_shapes): mask_flatten_ = memory_padding_mask[:, _cur:(_cur + H * W)].reshape( [N, H, W, 1]) valid_H = paddle.sum(mask_flatten_[:, :, 0, 0], 1) valid_W = paddle.sum(mask_flatten_[:, 0, :, 0], 1) grid_y, grid_x = paddle.meshgrid( paddle.linspace( 0, H - 1, H, dtype="float32"), paddle.linspace( 0, W - 1, W, dtype="float32")) grid = paddle.concat([grid_x.unsqueeze(-1), grid_y.unsqueeze(-1)], -1) scale = paddle.concat( [valid_W.unsqueeze(-1), valid_H.unsqueeze(-1)], 1).reshape([N, 1, 1, 2]) grid = (grid.unsqueeze(0).expand((N, -1, -1, -1)) + 0.5) / scale proposal = grid.reshape([N, -1, 2]) proposals.append(proposal) _cur += (H * W) output_proposals = paddle.concat(proposals, 1) output_proposals_valid = ((output_proposals > 0.01) & (output_proposals < 0.99)).all( -1, keepdim=True).astype("bool") output_proposals = paddle.log(output_proposals / (1 - output_proposals)) output_proposals = masked_fill( output_proposals, ~memory_padding_mask.astype("bool").unsqueeze(-1), float('inf')) output_proposals = masked_fill(output_proposals, ~output_proposals_valid, float('inf')) output_memory = memory output_memory = masked_fill( output_memory, ~memory_padding_mask.astype("bool").unsqueeze(-1), float(0)) output_memory = masked_fill(output_memory, ~output_proposals_valid, float(0)) output_memory = self.enc_output_norm(self.enc_output(output_memory)) return output_memory, output_proposals @staticmethod def get_reference_points(spatial_shapes, valid_ratios): """Get the reference points used in decoder. Args: spatial_shapes (Tensor): The shape of all feature maps, has shape (num_level, 2). valid_ratios (Tensor): The radios of valid points on the feature map, has shape (bs, num_levels, 2). Returns: Tensor: reference points used in decoder, has \ shape (bs, num_keys, num_levels, 2). """ reference_points_list = [] for lvl, (H, W) in enumerate(spatial_shapes): ref_y, ref_x = paddle.meshgrid( paddle.linspace( 0.5, H - 0.5, H, dtype="float32"), paddle.linspace( 0.5, W - 0.5, W, dtype="float32")) ref_y = ref_y.reshape( (-1, ))[None] / (valid_ratios[:, None, lvl, 1] * H) ref_x = ref_x.reshape( (-1, ))[None] / (valid_ratios[:, None, lvl, 0] * W) ref = paddle.stack((ref_x, ref_y), -1) reference_points_list.append(ref) reference_points = paddle.concat(reference_points_list, 1) reference_points = reference_points[:, :, None] * valid_ratios[:, None] return reference_points def get_valid_ratio(self, mask): """Get the valid radios of feature maps of all level.""" _, H, W = mask.shape valid_H = paddle.sum(mask[:, :, 0].astype('float'), 1) valid_W = paddle.sum(mask[:, 0, :].astype('float'), 1) valid_ratio_h = valid_H.astype('float') / H valid_ratio_w = valid_W.astype('float') / W valid_ratio = paddle.stack([valid_ratio_w, valid_ratio_h], -1) return valid_ratio def get_proposal_pos_embed(self, proposals, num_pos_feats=128, temperature=10000): """Get the position embedding of proposal.""" scale = 2 * math.pi dim_t = paddle.arange(num_pos_feats, dtype="float32") dim_t = temperature**(2 * (dim_t // 2) / num_pos_feats) # N, L, 4 proposals = F.sigmoid(proposals) * scale # N, L, 4, 128 pos = proposals[:, :, :, None] / dim_t # N, L, 4, 64, 2 pos = paddle.stack( (pos[:, :, :, 0::2].sin(), pos[:, :, :, 1::2].cos()), axis=4).flatten(2) return pos def forward(self, mlvl_feats, mlvl_masks, query_embed, mlvl_pos_embeds, kpt_branches=None, cls_branches=None): """Forward function for `Transformer`. Args: mlvl_feats (list(Tensor)): Input queries from different level. Each element has shape [bs, embed_dims, h, w]. mlvl_masks (list(Tensor)): The key_padding_mask from different level used for encoder and decoder, each element has shape [bs, h, w]. query_embed (Tensor): The query embedding for decoder, with shape [num_query, c]. mlvl_pos_embeds (list(Tensor)): The positional encoding of feats from different level, has the shape [bs, embed_dims, h, w]. kpt_branches (obj:`nn.LayerList`): Keypoint Regression heads for feature maps from each decoder layer. Only would be passed when `with_box_refine` is Ture. Default to None. cls_branches (obj:`nn.LayerList`): Classification heads for feature maps from each decoder layer. Only would be passed when `as_two_stage` is Ture. Default to None. Returns: tuple[Tensor]: results of decoder containing the following tensor. - inter_states: Outputs from decoder. If `return_intermediate_dec` is True output has shape \ (num_dec_layers, bs, num_query, embed_dims), else has \ shape (1, bs, num_query, embed_dims). - init_reference_out: The initial value of reference \ points, has shape (bs, num_queries, 4). - inter_references_out: The internal value of reference \ points in decoder, has shape \ (num_dec_layers, bs,num_query, embed_dims) - enc_outputs_class: The classification score of proposals \ generated from encoder's feature maps, has shape \ (batch, h*w, num_classes). \ Only would be returned when `as_two_stage` is True, \ otherwise None. - enc_outputs_kpt_unact: The regression results generated from \ encoder's feature maps., has shape (batch, h*w, K*2). Only would be returned when `as_two_stage` is True, \ otherwise None. """ assert self.as_two_stage or query_embed is not None feat_flatten = [] mask_flatten = [] lvl_pos_embed_flatten = [] spatial_shapes = [] for lvl, (feat, mask, pos_embed ) in enumerate(zip(mlvl_feats, mlvl_masks, mlvl_pos_embeds)): bs, c, h, w = feat.shape spatial_shape = (h, w) spatial_shapes.append(spatial_shape) feat = feat.flatten(2).transpose((0, 2, 1)) mask = mask.flatten(1) pos_embed = pos_embed.flatten(2).transpose((0, 2, 1)) lvl_pos_embed = pos_embed + self.level_embeds[lvl].reshape( [1, 1, -1]) lvl_pos_embed_flatten.append(lvl_pos_embed) feat_flatten.append(feat) mask_flatten.append(mask) feat_flatten = paddle.concat(feat_flatten, 1) mask_flatten = paddle.concat(mask_flatten, 1) lvl_pos_embed_flatten = paddle.concat(lvl_pos_embed_flatten, 1) spatial_shapes_cumsum = paddle.to_tensor( np.array(spatial_shapes).prod(1).cumsum(0)) spatial_shapes = paddle.to_tensor(spatial_shapes, dtype="int64") level_start_index = paddle.concat((paddle.zeros( (1, ), dtype=spatial_shapes.dtype), spatial_shapes_cumsum[:-1])) valid_ratios = paddle.stack( [self.get_valid_ratio(m) for m in mlvl_masks], 1) reference_points = \ self.get_reference_points(spatial_shapes, valid_ratios) memory = self.encoder( src=feat_flatten, pos_embed=lvl_pos_embed_flatten, src_mask=mask_flatten, value_spatial_shapes=spatial_shapes, reference_points=reference_points, value_level_start_index=level_start_index, valid_ratios=valid_ratios) bs, _, c = memory.shape hm_proto = None if self.training: hm_memory = paddle.slice( memory, starts=level_start_index[0], ends=level_start_index[1], axes=[1]) hm_pos_embed = paddle.slice( lvl_pos_embed_flatten, starts=level_start_index[0], ends=level_start_index[1], axes=[1]) hm_mask = paddle.slice( mask_flatten, starts=level_start_index[0], ends=level_start_index[1], axes=[1]) hm_reference_points = paddle.slice( reference_points, starts=level_start_index[0], ends=level_start_index[1], axes=[1])[:, :, :1, :] # official code make a mistake of pos_embed to pose_embed, which disable pos_embed hm_memory = self.hm_encoder( src=hm_memory, pose_embed=hm_pos_embed, src_mask=hm_mask, value_spatial_shapes=spatial_shapes[[0]], reference_points=hm_reference_points, value_level_start_index=level_start_index[0], valid_ratios=valid_ratios[:, :1, :]) hm_memory = hm_memory.reshape((bs, spatial_shapes[0, 0], spatial_shapes[0, 1], -1)) hm_proto = (hm_memory, mlvl_masks[0]) if self.as_two_stage: output_memory, output_proposals = \ self.gen_encoder_output_proposals( memory, mask_flatten, spatial_shapes) enc_outputs_class = cls_branches[self.decoder.num_layers]( output_memory) enc_outputs_kpt_unact = \ kpt_branches[self.decoder.num_layers](output_memory) enc_outputs_kpt_unact[..., 0::2] += output_proposals[..., 0:1] enc_outputs_kpt_unact[..., 1::2] += output_proposals[..., 1:2] topk = self.two_stage_num_proposals topk_proposals = paddle.topk( enc_outputs_class[..., 0], topk, axis=1)[1].unsqueeze(-1) #paddle.take_along_axis 对应torch.gather topk_kpts_unact = paddle.take_along_axis(enc_outputs_kpt_unact, topk_proposals, 1) topk_kpts_unact = topk_kpts_unact.detach() reference_points = F.sigmoid(topk_kpts_unact) init_reference_out = reference_points # learnable query and query_pos query_pos, query = paddle.split( query_embed, query_embed.shape[1] // c, axis=1) query_pos = query_pos.unsqueeze(0).expand((bs, -1, -1)) query = query.unsqueeze(0).expand((bs, -1, -1)) else: query_pos, query = paddle.split( query_embed, query_embed.shape[1] // c, axis=1) query_pos = query_pos.unsqueeze(0).expand((bs, -1, -1)) query = query.unsqueeze(0).expand((bs, -1, -1)) reference_points = F.sigmoid(self.reference_points(query_pos)) init_reference_out = reference_points # decoder inter_states, inter_references = self.decoder( query=query, memory=memory, query_pos_embed=query_pos, memory_mask=mask_flatten, reference_points=reference_points, value_spatial_shapes=spatial_shapes, value_level_start_index=level_start_index, valid_ratios=valid_ratios, kpt_branches=kpt_branches) inter_references_out = inter_references if self.as_two_stage: return inter_states, init_reference_out, \ inter_references_out, enc_outputs_class, \ enc_outputs_kpt_unact, hm_proto, memory return inter_states, init_reference_out, \ inter_references_out, None, None, None, None, None, hm_proto def forward_refine(self, mlvl_masks, memory, reference_points_pose, img_inds, kpt_branches=None, **kwargs): mask_flatten = [] spatial_shapes = [] for lvl, mask in enumerate(mlvl_masks): bs, h, w = mask.shape spatial_shape = (h, w) spatial_shapes.append(spatial_shape) mask = mask.flatten(1) mask_flatten.append(mask) mask_flatten = paddle.concat(mask_flatten, 1) spatial_shapes_cumsum = paddle.to_tensor( np.array( spatial_shapes, dtype='int64').prod(1).cumsum(0)) spatial_shapes = paddle.to_tensor(spatial_shapes, dtype="int64") level_start_index = paddle.concat((paddle.zeros( (1, ), dtype=spatial_shapes.dtype), spatial_shapes_cumsum[:-1])) valid_ratios = paddle.stack( [self.get_valid_ratio(m) for m in mlvl_masks], 1) # pose refinement (17 queries corresponding to 17 keypoints) # learnable query and query_pos refine_query_embedding = self.refine_query_embedding.weight query_pos, query = paddle.split(refine_query_embedding, 2, axis=1) pos_num = reference_points_pose.shape[0] query_pos = query_pos.unsqueeze(0).expand((pos_num, -1, -1)) query = query.unsqueeze(0).expand((pos_num, -1, -1)) reference_points = reference_points_pose.reshape( (pos_num, reference_points_pose.shape[1] // 2, 2)) pos_memory = memory[img_inds] mask_flatten = mask_flatten[img_inds] valid_ratios = valid_ratios[img_inds] if img_inds.size == 1: pos_memory = pos_memory.unsqueeze(0) mask_flatten = mask_flatten.unsqueeze(0) valid_ratios = valid_ratios.unsqueeze(0) inter_states, inter_references = self.refine_decoder( query=query, memory=pos_memory, query_pos_embed=query_pos, memory_mask=mask_flatten, reference_points=reference_points, value_spatial_shapes=spatial_shapes, value_level_start_index=level_start_index, valid_ratios=valid_ratios, reg_branches=kpt_branches, **kwargs) # [num_decoder, num_query, bs, embed_dim] init_reference_out = reference_points return inter_states, init_reference_out, inter_references