# Copyright (c) OpenMMLab. All rights reserved. import math from numbers import Number from typing import Dict, List, Optional, Sequence, Tuple, Union import numpy as np import torch from mmcv.ops import Voxelization from mmengine.model import stack_batch from mmengine.utils import is_list_of from torch.nn import functional as F from mmdet3d.registry import MODELS from mmdet3d.utils import OptConfigType from mmdet.models import DetDataPreprocessor from mmdet.models.utils.misc import samplelist_boxtype2tensor @MODELS.register_module() class Det3DDataPreprocessor(DetDataPreprocessor): """Points / Image pre-processor for point clouds / vision-only / multi- modality 3D detection tasks. It provides the data pre-processing as follows - Collate and move image and point cloud data to the target device. - 1) For image data: - Pad images in inputs to the maximum size of current batch with defined ``pad_value``. The padding size can be divisible by a defined ``pad_size_divisor`` - Stack images in inputs to batch_imgs. - Convert images in inputs from bgr to rgb if the shape of input is (3, H, W). - Normalize images in inputs with defined std and mean. - Do batch augmentations during training. - 2) For point cloud data: - if no voxelization, directly return list of point cloud data. - if voxelization is applied, voxelize point cloud according to ``voxel_type`` and obtain ``voxels``. Args: voxel (bool): Whether to apply voxelziation to point cloud. voxel_type (str): Voxelization type. Two voxelization types are provided: 'hard' and 'dynamic', respectively for hard voxelization and dynamic voxelization. Defaults to 'hard'. voxel_layer (:obj:`ConfigDict`, optional): Voxelization layer config. Defaults to None. mean (Sequence[Number], optional): The pixel mean of R, G, B channels. Defaults to None. std (Sequence[Number], optional): The pixel standard deviation of R, G, B channels. Defaults to None. pad_size_divisor (int): The size of padded image should be divisible by ``pad_size_divisor``. Defaults to 1. pad_value (Number): The padded pixel value. Defaults to 0. bgr_to_rgb (bool): whether to convert image from BGR to RGB. Defaults to False. rgb_to_bgr (bool): whether to convert image from RGB to RGB. Defaults to False. batch_augments (list[dict], optional): Batch-level augmentations """ def __init__(self, voxel: bool = False, voxel_type: str = 'hard', voxel_layer: OptConfigType = None, mean: Sequence[Number] = None, std: Sequence[Number] = None, pad_size_divisor: int = 1, pad_value: Union[float, int] = 0, pad_mask: bool = False, mask_pad_value: int = 0, pad_seg: bool = False, seg_pad_value: int = 255, bgr_to_rgb: bool = False, rgb_to_bgr: bool = False, boxtype2tensor: bool = True, batch_augments: Optional[List[dict]] = None): super().__init__( mean=mean, std=std, pad_size_divisor=pad_size_divisor, pad_value=pad_value, pad_mask=pad_mask, mask_pad_value=mask_pad_value, pad_seg=pad_seg, seg_pad_value=seg_pad_value, bgr_to_rgb=bgr_to_rgb, rgb_to_bgr=rgb_to_bgr, boxtype2tensor=boxtype2tensor, batch_augments=batch_augments) self.voxel = voxel self.voxel_type = voxel_type if voxel: self.voxel_layer = Voxelization(**voxel_layer) def forward( self, data: Union[dict, List[dict]], training: bool = False ) -> Tuple[Union[dict, List[dict]], Optional[list]]: """Perform normalization、padding and bgr2rgb conversion based on ``BaseDataPreprocessor``. Args: data (dict | List[dict]): data from dataloader. The dict contains the whole batch data, when it is a list[dict], the list indicate test time augmentation. training (bool): Whether to enable training time augmentation. Defaults to False. Returns: Dict | List[Dict]: Data in the same format as the model input. """ if isinstance(data, list): num_augs = len(data) aug_batch_data = [] for aug_id in range(num_augs): single_aug_batch_data = self.simple_process( data[aug_id], training) aug_batch_data.append(single_aug_batch_data) return aug_batch_data else: return self.simple_process(data, training) def simple_process(self, data: dict, training: bool = False) -> dict: """Perform normalization、padding and bgr2rgb conversion for img data based on ``BaseDataPreprocessor``, and voxelize point cloud if `voxel` is set to be True. Args: data (dict): Data sampled from dataloader. training (bool): Whether to enable training time augmentation. Defaults to False. Returns: dict: Data in the same format as the model input. """ if 'img' in data['inputs']: batch_pad_shape = self._get_pad_shape(data) data = self.collate_data(data) inputs, data_samples = data['inputs'], data['data_samples'] batch_inputs = dict() if 'points' in inputs: batch_inputs['points'] = inputs['points'] if self.voxel: voxel_dict = self.voxelize(inputs['points']) batch_inputs['voxels'] = voxel_dict if 'imgs' in inputs: imgs = inputs['imgs'] if data_samples is not None: # NOTE the batched image size information may be useful, e.g. # in DETR, this is needed for the construction of masks, which # is then used for the transformer_head. batch_input_shape = tuple(imgs[0].size()[-2:]) for data_sample, pad_shape in zip(data_samples, batch_pad_shape): data_sample.set_metainfo({ 'batch_input_shape': batch_input_shape, 'pad_shape': pad_shape }) if self.boxtype2tensor: samplelist_boxtype2tensor(data_samples) if self.pad_mask: self.pad_gt_masks(data_samples) if self.pad_seg: self.pad_gt_sem_seg(data_samples) if training and self.batch_augments is not None: for batch_aug in self.batch_augments: imgs, data_samples = batch_aug(imgs, data_samples) batch_inputs['imgs'] = imgs return {'inputs': batch_inputs, 'data_samples': data_samples} def collate_data(self, data: dict) -> dict: """Copying data to the target device and Performs normalization、 padding and bgr2rgb conversion and stack based on ``BaseDataPreprocessor``. Collates the data sampled from dataloader into a list of dict and list of labels, and then copies tensor to the target device. Args: data (dict): Data sampled from dataloader. Returns: dict: Data in the same format as the model input. """ data = self.cast_data(data) # type: ignore if 'img' in data['inputs']: _batch_imgs = data['inputs']['img'] # Process data with `pseudo_collate`. if is_list_of(_batch_imgs, torch.Tensor): batch_imgs = [] for _batch_img in _batch_imgs: # channel transform if self._channel_conversion: _batch_img = _batch_img[[2, 1, 0], ...] # Convert to float after channel conversion to ensure # efficiency _batch_img = _batch_img.float() # Normalization. if self._enable_normalize: if self.mean.shape[0] == 3: assert _batch_img.dim( ) == 3 and _batch_img.shape[0] == 3, ( 'If the mean has 3 values, the input tensor ' 'should in shape of (3, H, W), but got the ' f'tensor with shape {_batch_img.shape}') _batch_img = (_batch_img - self.mean) / self.std batch_imgs.append(_batch_img) # Pad and stack Tensor. batch_imgs = stack_batch(batch_imgs, self.pad_size_divisor, self.pad_value) # Process data with `default_collate`. elif isinstance(_batch_imgs, torch.Tensor): assert _batch_imgs.dim() == 4, ( 'The input of `ImgDataPreprocessor` should be a NCHW ' 'tensor or a list of tensor, but got a tensor with ' f'shape: {_batch_imgs.shape}') if self._channel_conversion: _batch_imgs = _batch_imgs[:, [2, 1, 0], ...] # Convert to float after channel conversion to ensure # efficiency _batch_imgs = _batch_imgs.float() if self._enable_normalize: _batch_imgs = (_batch_imgs - self.mean) / self.std h, w = _batch_imgs.shape[2:] target_h = math.ceil( h / self.pad_size_divisor) * self.pad_size_divisor target_w = math.ceil( w / self.pad_size_divisor) * self.pad_size_divisor pad_h = target_h - h pad_w = target_w - w batch_imgs = F.pad(_batch_imgs, (0, pad_w, 0, pad_h), 'constant', self.pad_value) else: raise TypeError( 'Output of `cast_data` should be a list of dict ' 'or a tuple with inputs and data_samples, but got' f'{type(data)}: {data}') data['inputs']['imgs'] = batch_imgs data.setdefault('data_samples', None) return data def _get_pad_shape(self, data: dict) -> List[tuple]: """Get the pad_shape of each image based on data and pad_size_divisor.""" # rewrite `_get_pad_shape` for obaining image inputs. _batch_inputs = data['inputs']['img'] # Process data with `pseudo_collate`. if is_list_of(_batch_inputs, torch.Tensor): batch_pad_shape = [] for ori_input in _batch_inputs: pad_h = int( np.ceil(ori_input.shape[1] / self.pad_size_divisor)) * self.pad_size_divisor pad_w = int( np.ceil(ori_input.shape[2] / self.pad_size_divisor)) * self.pad_size_divisor batch_pad_shape.append((pad_h, pad_w)) # Process data with `default_collate`. elif isinstance(_batch_inputs, torch.Tensor): assert _batch_inputs.dim() == 4, ( 'The input of `ImgDataPreprocessor` should be a NCHW tensor ' 'or a list of tensor, but got a tensor with shape: ' f'{_batch_inputs.shape}') pad_h = int( np.ceil(_batch_inputs.shape[1] / self.pad_size_divisor)) * self.pad_size_divisor pad_w = int( np.ceil(_batch_inputs.shape[2] / self.pad_size_divisor)) * self.pad_size_divisor batch_pad_shape = [(pad_h, pad_w)] * _batch_inputs.shape[0] else: raise TypeError('Output of `cast_data` should be a list of dict ' 'or a tuple with inputs and data_samples, but got' f'{type(data)}: {data}') return batch_pad_shape @torch.no_grad() def voxelize(self, points: List[torch.Tensor]) -> Dict: """Apply voxelization to point cloud. Args: points (List[Tensor]): Point cloud in one data batch. Returns: dict[str, Tensor]: Voxelization information. - voxels (Tensor): Features of voxels, shape is MXNxC for hard voxelization, NXC for dynamic voxelization. - coors (Tensor): Coordinates of voxels, shape is Nx(1+NDim), where 1 represents the batch index. - num_points (Tensor, optional): Number of points in each voxel. - voxel_centers (Tensor, optional): Centers of voxels. """ voxel_dict = dict() if self.voxel_type == 'hard': voxels, coors, num_points, voxel_centers = [], [], [], [] for res in points: res_voxels, res_coors, res_num_points = self.voxel_layer(res) res_voxel_centers = ( res_coors[:, [2, 1, 0]] + 0.5) * res_voxels.new_tensor( self.voxel_layer.voxel_size) + res_voxels.new_tensor( self.voxel_layer.point_cloud_range[0:3]) voxels.append(res_voxels) coors.append(res_coors) num_points.append(res_num_points) voxel_centers.append(res_voxel_centers) voxels = torch.cat(voxels, dim=0) num_points = torch.cat(num_points, dim=0) voxel_centers = torch.cat(voxel_centers, dim=0) coors_batch = [] for i, coor in enumerate(coors): coor_pad = F.pad(coor, (1, 0), mode='constant', value=i) coors_batch.append(coor_pad) coors_batch = torch.cat(coors_batch, dim=0) voxel_dict['num_points'] = num_points voxel_dict['voxel_centers'] = voxel_centers elif self.voxel_type == 'dynamic': coors = [] # dynamic voxelization only provide a coors mapping for res in points: res_coors = self.voxel_layer(res) coors.append(res_coors) voxels = torch.cat(points, dim=0) coors_batch = [] for i, coor in enumerate(coors): coor_pad = F.pad(coor, (1, 0), mode='constant', value=i) coors_batch.append(coor_pad) coors_batch = torch.cat(coors_batch, dim=0) else: raise ValueError(f'Invalid voxelization type {self.voxel_type}') voxel_dict['voxels'] = voxels voxel_dict['coors'] = coors_batch return voxel_dict