import copy from typing import Tuple, Union import torch import torch.nn as nn from torch import Tensor from mmdet.models.detectors.base import BaseDetector from mmdet.registry import MODELS from mmdet.structures import OptSampleList, SampleList from mmdet.utils import InstanceList, OptConfigType, OptMultiConfig @MODELS.register_module() class CoDETR(BaseDetector): def __init__( self, backbone, neck=None, query_head=None, # detr head rpn_head=None, # two-stage rpn roi_head=[None], # two-stage bbox_head=[None], # one-stage train_cfg=[None, None], test_cfg=[None, None], # Control whether to consider positive samples # from the auxiliary head as additional positive queries. with_pos_coord=True, use_lsj=True, eval_module='detr', # Evaluate the Nth head. eval_index=0, data_preprocessor: OptConfigType = None, init_cfg: OptMultiConfig = None): super(CoDETR, self).__init__( data_preprocessor=data_preprocessor, init_cfg=init_cfg) self.with_pos_coord = with_pos_coord self.use_lsj = use_lsj assert eval_module in ['detr', 'one-stage', 'two-stage'] self.eval_module = eval_module self.backbone = MODELS.build(backbone) if neck is not None: self.neck = MODELS.build(neck) # Module index for evaluation self.eval_index = eval_index head_idx = 0 if query_head is not None: query_head.update(train_cfg=train_cfg[head_idx] if ( train_cfg is not None and train_cfg[head_idx] is not None ) else None) query_head.update(test_cfg=test_cfg[head_idx]) self.query_head = MODELS.build(query_head) self.query_head.init_weights() head_idx += 1 if rpn_head is not None: rpn_train_cfg = train_cfg[head_idx].rpn if ( train_cfg is not None and train_cfg[head_idx] is not None) else None rpn_head_ = rpn_head.copy() rpn_head_.update( train_cfg=rpn_train_cfg, test_cfg=test_cfg[head_idx].rpn) self.rpn_head = MODELS.build(rpn_head_) self.rpn_head.init_weights() self.roi_head = nn.ModuleList() for i in range(len(roi_head)): if roi_head[i]: rcnn_train_cfg = train_cfg[i + head_idx].rcnn if ( train_cfg and train_cfg[i + head_idx] is not None) else None roi_head[i].update(train_cfg=rcnn_train_cfg) roi_head[i].update(test_cfg=test_cfg[i + head_idx].rcnn) self.roi_head.append(MODELS.build(roi_head[i])) self.roi_head[-1].init_weights() self.bbox_head = nn.ModuleList() for i in range(len(bbox_head)): if bbox_head[i]: bbox_head[i].update( train_cfg=train_cfg[i + head_idx + len(self.roi_head)] if ( train_cfg and train_cfg[i + head_idx + len(self.roi_head)] is not None ) else None) bbox_head[i].update(test_cfg=test_cfg[i + head_idx + len(self.roi_head)]) self.bbox_head.append(MODELS.build(bbox_head[i])) self.bbox_head[-1].init_weights() self.head_idx = head_idx self.train_cfg = train_cfg self.test_cfg = test_cfg @property def with_rpn(self): """bool: whether the detector has RPN""" return hasattr(self, 'rpn_head') and self.rpn_head is not None @property def with_query_head(self): """bool: whether the detector has a RoI head""" return hasattr(self, 'query_head') and self.query_head is not None @property def with_roi_head(self): """bool: whether the detector has a RoI head""" return hasattr(self, 'roi_head') and self.roi_head is not None and len( self.roi_head) > 0 @property def with_shared_head(self): """bool: whether the detector has a shared head in the RoI Head""" return hasattr(self, 'roi_head') and self.roi_head[0].with_shared_head @property def with_bbox(self): """bool: whether the detector has a bbox head""" return ((hasattr(self, 'roi_head') and self.roi_head is not None and len(self.roi_head) > 0) or (hasattr(self, 'bbox_head') and self.bbox_head is not None and len(self.bbox_head) > 0)) def extract_feat(self, batch_inputs: Tensor) -> Tuple[Tensor]: """Extract features. Args: batch_inputs (Tensor): Image tensor, has shape (bs, dim, H, W). Returns: tuple[Tensor]: Tuple of feature maps from neck. Each feature map has shape (bs, dim, H, W). """ x = self.backbone(batch_inputs) if self.with_neck: x = self.neck(x) return x def _forward(self, batch_inputs: Tensor, batch_data_samples: OptSampleList = None): pass def loss(self, batch_inputs: Tensor, batch_data_samples: SampleList) -> Union[dict, list]: batch_input_shape = batch_data_samples[0].batch_input_shape if self.use_lsj: for data_samples in batch_data_samples: img_metas = data_samples.metainfo input_img_h, input_img_w = batch_input_shape img_metas['img_shape'] = [input_img_h, input_img_w] x = self.extract_feat(batch_inputs) losses = dict() def upd_loss(losses, idx, weight=1): new_losses = dict() for k, v in losses.items(): new_k = '{}{}'.format(k, idx) if isinstance(v, list) or isinstance(v, tuple): new_losses[new_k] = [i * weight for i in v] else: new_losses[new_k] = v * weight return new_losses # DETR encoder and decoder forward if self.with_query_head: bbox_losses, x = self.query_head.loss(x, batch_data_samples) losses.update(bbox_losses) # RPN forward and loss if self.with_rpn: proposal_cfg = self.train_cfg[self.head_idx].get( 'rpn_proposal', self.test_cfg[self.head_idx].rpn) rpn_data_samples = copy.deepcopy(batch_data_samples) # set cat_id of gt_labels to 0 in RPN for data_sample in rpn_data_samples: data_sample.gt_instances.labels = \ torch.zeros_like(data_sample.gt_instances.labels) rpn_losses, proposal_list = self.rpn_head.loss_and_predict( x, rpn_data_samples, proposal_cfg=proposal_cfg) # avoid get same name with roi_head loss keys = rpn_losses.keys() for key in list(keys): if 'loss' in key and 'rpn' not in key: rpn_losses[f'rpn_{key}'] = rpn_losses.pop(key) losses.update(rpn_losses) else: assert batch_data_samples[0].get('proposals', None) is not None # use pre-defined proposals in InstanceData for the second stage # to extract ROI features. proposal_list = [ data_sample.proposals for data_sample in batch_data_samples ] positive_coords = [] for i in range(len(self.roi_head)): roi_losses = self.roi_head[i].loss(x, proposal_list, batch_data_samples) if self.with_pos_coord: positive_coords.append(roi_losses.pop('pos_coords')) else: if 'pos_coords' in roi_losses.keys(): roi_losses.pop('pos_coords') roi_losses = upd_loss(roi_losses, idx=i) losses.update(roi_losses) for i in range(len(self.bbox_head)): bbox_losses = self.bbox_head[i].loss(x, batch_data_samples) if self.with_pos_coord: pos_coords = bbox_losses.pop('pos_coords') positive_coords.append(pos_coords) else: if 'pos_coords' in bbox_losses.keys(): bbox_losses.pop('pos_coords') bbox_losses = upd_loss(bbox_losses, idx=i + len(self.roi_head)) losses.update(bbox_losses) if self.with_pos_coord and len(positive_coords) > 0: for i in range(len(positive_coords)): bbox_losses = self.query_head.loss_aux(x, positive_coords[i], i, batch_data_samples) bbox_losses = upd_loss(bbox_losses, idx=i) losses.update(bbox_losses) return losses def predict(self, batch_inputs: Tensor, batch_data_samples: SampleList, rescale: bool = True) -> SampleList: """Predict results from a batch of inputs and data samples with post- processing. Args: batch_inputs (Tensor): Inputs, has shape (bs, dim, H, W). batch_data_samples (List[:obj:`DetDataSample`]): The batch data samples. It usually includes information such as `gt_instance` or `gt_panoptic_seg` or `gt_sem_seg`. rescale (bool): Whether to rescale the results. Defaults to True. Returns: list[:obj:`DetDataSample`]: Detection results of the input images. Each DetDataSample usually contain 'pred_instances'. And the `pred_instances` usually contains following keys. - scores (Tensor): Classification scores, has a shape (num_instance, ) - labels (Tensor): Labels of bboxes, has a shape (num_instances, ). - bboxes (Tensor): Has a shape (num_instances, 4), the last dimension 4 arrange as (x1, y1, x2, y2). """ assert self.eval_module in ['detr', 'one-stage', 'two-stage'] if self.use_lsj: for data_samples in batch_data_samples: img_metas = data_samples.metainfo input_img_h, input_img_w = img_metas['batch_input_shape'] img_metas['img_shape'] = [input_img_h, input_img_w] img_feats = self.extract_feat(batch_inputs) if self.with_bbox and self.eval_module == 'one-stage': results_list = self.predict_bbox_head( img_feats, batch_data_samples, rescale=rescale) elif self.with_roi_head and self.eval_module == 'two-stage': results_list = self.predict_roi_head( img_feats, batch_data_samples, rescale=rescale) else: results_list = self.predict_query_head( img_feats, batch_data_samples, rescale=rescale) batch_data_samples = self.add_pred_to_datasample( batch_data_samples, results_list) return batch_data_samples def predict_query_head(self, mlvl_feats: Tuple[Tensor], batch_data_samples: SampleList, rescale: bool = True) -> InstanceList: return self.query_head.predict( mlvl_feats, batch_data_samples=batch_data_samples, rescale=rescale) def predict_roi_head(self, mlvl_feats: Tuple[Tensor], batch_data_samples: SampleList, rescale: bool = True) -> InstanceList: assert self.with_bbox, 'Bbox head must be implemented.' if self.with_query_head: batch_img_metas = [ data_samples.metainfo for data_samples in batch_data_samples ] results = self.query_head.forward(mlvl_feats, batch_img_metas) mlvl_feats = results[-1] rpn_results_list = self.rpn_head.predict( mlvl_feats, batch_data_samples, rescale=False) return self.roi_head[self.eval_index].predict( mlvl_feats, rpn_results_list, batch_data_samples, rescale=rescale) def predict_bbox_head(self, mlvl_feats: Tuple[Tensor], batch_data_samples: SampleList, rescale: bool = True) -> InstanceList: assert self.with_bbox, 'Bbox head must be implemented.' if self.with_query_head: batch_img_metas = [ data_samples.metainfo for data_samples in batch_data_samples ] results = self.query_head.forward(mlvl_feats, batch_img_metas) mlvl_feats = results[-1] return self.bbox_head[self.eval_index].predict( mlvl_feats, batch_data_samples, rescale=rescale)