import numpy as np from mmcv.utils import build_from_cfg from mmdet3d.core.bbox import box_np_ops from mmdet.datasets.builder import PIPELINES from mmdet.datasets.pipelines import RandomFlip from ..registry import OBJECTSAMPLERS from .data_augment_utils import noise_per_object_v3_ @PIPELINES.register_module() class RandomFlip3D(RandomFlip): """Flip the points & bbox. If the input dict contains the key "flip", then the flag will be used, otherwise it will be randomly decided by a ratio specified in the init method. Args: sync_2d (bool, optional): Whether to apply flip according to the 2D images. If True, it will apply the same flip as that to 2D images. If False, it will decide whether to flip randomly and independently to that of 2D images. Defaults to True. flip_ratio_bev_horizontal (float, optional): The flipping probability in horizontal direction. Defaults to 0.0. flip_ratio_bev_vertical (float, optional): The flipping probability in vertical direction. Defaults to 0.0. """ def __init__(self, sync_2d=True, flip_ratio_bev_horizontal=0.0, flip_ratio_bev_vertical=0.0, **kwargs): super(RandomFlip3D, self).__init__( flip_ratio=flip_ratio_bev_horizontal, **kwargs) self.sync_2d = sync_2d self.flip_ratio_bev_vertical = flip_ratio_bev_vertical if flip_ratio_bev_horizontal is not None: assert isinstance( flip_ratio_bev_horizontal, (int, float)) and 0 <= flip_ratio_bev_horizontal <= 1 if flip_ratio_bev_vertical is not None: assert isinstance( flip_ratio_bev_vertical, (int, float)) and 0 <= flip_ratio_bev_vertical <= 1 def random_flip_data_3d(self, input_dict, direction='horizontal'): """Flip 3D data randomly. Args: input_dict (dict): Result dict from loading pipeline. direction (str): Flip direction. Default: horizontal. Returns: dict: Flipped results, 'points', 'bbox3d_fields' keys are \ updated in the result dict. """ assert direction in ['horizontal', 'vertical'] if len(input_dict['bbox3d_fields']) == 0: # test mode input_dict['bbox3d_fields'].append('empty_box3d') input_dict['empty_box3d'] = input_dict['box_type_3d']( np.array([], dtype=np.float32)) assert len(input_dict['bbox3d_fields']) == 1 for key in input_dict['bbox3d_fields']: input_dict['points'] = input_dict[key].flip( direction, points=input_dict['points']) def __call__(self, input_dict): """Call function to flip points, values in the ``bbox3d_fields`` and \ also flip 2D image and its annotations. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Flipped results, 'flip', 'flip_direction', \ 'pcd_horizontal_flip' and 'pcd_vertical_flip' keys are added \ into result dict. """ # filp 2D image and its annotations super(RandomFlip3D, self).__call__(input_dict) if self.sync_2d: input_dict['pcd_horizontal_flip'] = input_dict['flip'] input_dict['pcd_vertical_flip'] = False else: if 'pcd_horizontal_flip' not in input_dict: flip_horizontal = True if np.random.rand( ) < self.flip_ratio else False input_dict['pcd_horizontal_flip'] = flip_horizontal if 'pcd_vertical_flip' not in input_dict: flip_vertical = True if np.random.rand( ) < self.flip_ratio_bev_vertical else False input_dict['pcd_vertical_flip'] = flip_vertical if input_dict['pcd_horizontal_flip']: self.random_flip_data_3d(input_dict, 'horizontal') if input_dict['pcd_vertical_flip']: self.random_flip_data_3d(input_dict, 'vertical') return input_dict def __repr__(self): """str: Return a string that describes the module.""" repr_str = self.__class__.__name__ repr_str += '(sync_2d={},'.format(self.sync_2d) repr_str += '(flip_ratio_bev_horizontal={},'.format( self.flip_ratio_bev_horizontal) repr_str += '(flip_ratio_bev_vertical={},'.format( self.flip_ratio_bev_vertical) return repr_str @PIPELINES.register_module() class ObjectSample(object): """Sample GT objects to the data. Args: db_sampler (dict): Config dict of the database sampler. sample_2d (bool): Whether to also paste 2D image patch to the images This should be true when applying multi-modality cut-and-paste. Defaults to False. """ def __init__(self, db_sampler, sample_2d=False): self.sampler_cfg = db_sampler self.sample_2d = sample_2d if 'type' not in db_sampler.keys(): db_sampler['type'] = 'DataBaseSampler' self.db_sampler = build_from_cfg(db_sampler, OBJECTSAMPLERS) @staticmethod def remove_points_in_boxes(points, boxes): """Remove the points in the sampled bounding boxes. Args: points (np.ndarray): Input point cloud array. boxes (np.ndarray): Sampled ground truth boxes. Returns: np.ndarray: Points with those in the boxes removed. """ masks = box_np_ops.points_in_rbbox(points, boxes) points = points[np.logical_not(masks.any(-1))] return points def __call__(self, input_dict): """Call function to sample ground truth objects to the data. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after object sampling augmentation, \ 'points', 'gt_bboxes_3d', 'gt_labels_3d' keys are updated \ in the result dict. """ gt_bboxes_3d = input_dict['gt_bboxes_3d'] gt_labels_3d = input_dict['gt_labels_3d'] # change to float for blending operation points = input_dict['points'] if self.sample_2d: img = input_dict['img'] gt_bboxes_2d = input_dict['gt_bboxes'] # Assume for now 3D & 2D bboxes are the same sampled_dict = self.db_sampler.sample_all( gt_bboxes_3d.tensor.numpy(), gt_labels_3d, gt_bboxes_2d=gt_bboxes_2d, img=img) else: sampled_dict = self.db_sampler.sample_all( gt_bboxes_3d.tensor.numpy(), gt_labels_3d, img=None) if sampled_dict is not None: sampled_gt_bboxes_3d = sampled_dict['gt_bboxes_3d'] sampled_points = sampled_dict['points'] sampled_gt_labels = sampled_dict['gt_labels_3d'] gt_labels_3d = np.concatenate([gt_labels_3d, sampled_gt_labels], axis=0) gt_bboxes_3d = gt_bboxes_3d.new_box( np.concatenate( [gt_bboxes_3d.tensor.numpy(), sampled_gt_bboxes_3d])) points = self.remove_points_in_boxes(points, sampled_gt_bboxes_3d) # check the points dimension dim_inds = points.shape[-1] points = np.concatenate([sampled_points[:, :dim_inds], points], axis=0) if self.sample_2d: sampled_gt_bboxes_2d = sampled_dict['gt_bboxes_2d'] gt_bboxes_2d = np.concatenate( [gt_bboxes_2d, sampled_gt_bboxes_2d]).astype(np.float32) input_dict['gt_bboxes'] = gt_bboxes_2d input_dict['img'] = sampled_dict['img'] input_dict['gt_bboxes_3d'] = gt_bboxes_3d input_dict['gt_labels_3d'] = gt_labels_3d input_dict['points'] = points return input_dict def __repr__(self): """str: Return a string that describes the module.""" repr_str = self.__class__.__name__ repr_str += f' sample_2d={self.sample_2d},' repr_str += f' data_root={self.sampler_cfg.data_root},' repr_str += f' info_path={self.sampler_cfg.info_path},' repr_str += f' rate={self.sampler_cfg.rate},' repr_str += f' prepare={self.sampler_cfg.prepare},' repr_str += f' classes={self.sampler_cfg.classes},' repr_str += f' sample_groups={self.sampler_cfg.sample_groups}' return repr_str @PIPELINES.register_module() class ObjectNoise(object): """Apply noise to each GT objects in the scene. Args: translation_std (list[float], optional): Standard deviation of the distribution where translation noise are sampled from. Defaults to [0.25, 0.25, 0.25]. global_rot_range (list[float], optional): Global rotation to the scene. Defaults to [0.0, 0.0]. rot_range (list[float], optional): Object rotation range. Defaults to [-0.15707963267, 0.15707963267]. num_try (int, optional): Number of times to try if the noise applied is invalid. Defaults to 100. """ def __init__(self, translation_std=[0.25, 0.25, 0.25], global_rot_range=[0.0, 0.0], rot_range=[-0.15707963267, 0.15707963267], num_try=100): self.translation_std = translation_std self.global_rot_range = global_rot_range self.rot_range = rot_range self.num_try = num_try def __call__(self, input_dict): """Call function to apply noise to each ground truth in the scene. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after adding noise to each object, \ 'points', 'gt_bboxes_3d' keys are updated in the result dict. """ gt_bboxes_3d = input_dict['gt_bboxes_3d'] points = input_dict['points'] # TODO: check this inplace function numpy_box = gt_bboxes_3d.tensor.numpy() noise_per_object_v3_( numpy_box, points, rotation_perturb=self.rot_range, center_noise_std=self.translation_std, global_random_rot_range=self.global_rot_range, num_try=self.num_try) input_dict['gt_bboxes_3d'] = gt_bboxes_3d.new_box(numpy_box) input_dict['points'] = points return input_dict def __repr__(self): """str: Return a string that describes the module.""" repr_str = self.__class__.__name__ repr_str += '(num_try={},'.format(self.num_try) repr_str += ' translation_std={},'.format(self.translation_std) repr_str += ' global_rot_range={},'.format(self.global_rot_range) repr_str += ' rot_range={})'.format(self.rot_range) return repr_str @PIPELINES.register_module() class GlobalRotScaleTrans(object): """Apply global rotation, scaling and translation to a 3D scene. Args: rot_range (list[float]): Range of rotation angle. Defaults to [-0.78539816, 0.78539816] (close to [-pi/4, pi/4]). scale_ratio_range (list[float]): Range of scale ratio. Defaults to [0.95, 1.05]. translation_std (list[float]): The standard deviation of ranslation noise. This apply random translation to a scene by a noise, which is sampled from a gaussian distribution whose standard deviation is set by ``translation_std``. Defaults to [0, 0, 0] shift_height (bool): Whether to shift height. (the fourth dimension of indoor points) when scaling. Defaults to False. """ def __init__(self, rot_range=[-0.78539816, 0.78539816], scale_ratio_range=[0.95, 1.05], translation_std=[0, 0, 0], shift_height=False): self.rot_range = rot_range self.scale_ratio_range = scale_ratio_range self.translation_std = translation_std self.shift_height = shift_height def _trans_bbox_points(self, input_dict): """Private function to translate bounding boxes and points. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after translation, 'points', 'pcd_trans' \ and keys in input_dict['bbox3d_fields'] are updated \ in the result dict. """ if not isinstance(self.translation_std, (list, tuple, np.ndarray)): translation_std = [ self.translation_std, self.translation_std, self.translation_std ] else: translation_std = self.translation_std translation_std = np.array(translation_std, dtype=np.float32) trans_factor = np.random.normal(scale=translation_std, size=3).T input_dict['points'][:, :3] += trans_factor input_dict['pcd_trans'] = trans_factor for key in input_dict['bbox3d_fields']: input_dict[key].translate(trans_factor) def _rot_bbox_points(self, input_dict): """Private function to rotate bounding boxes and points. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after rotation, 'points', 'pcd_rotation' \ and keys in input_dict['bbox3d_fields'] are updated \ in the result dict. """ rotation = self.rot_range if not isinstance(rotation, list): rotation = [-rotation, rotation] noise_rotation = np.random.uniform(rotation[0], rotation[1]) for key in input_dict['bbox3d_fields']: if len(input_dict[key].tensor) != 0: points, rot_mat_T = input_dict[key].rotate( noise_rotation, input_dict['points']) input_dict['points'] = points input_dict['pcd_rotation'] = rot_mat_T def _scale_bbox_points(self, input_dict): """Private function to scale bounding boxes and points. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after scaling, 'points'and keys in \ input_dict['bbox3d_fields'] are updated in the result dict. """ scale = input_dict['pcd_scale_factor'] input_dict['points'][:, :3] *= scale if self.shift_height: input_dict['points'][:, -1] *= scale for key in input_dict['bbox3d_fields']: input_dict[key].scale(scale) def _random_scale(self, input_dict): """Private function to randomly set the scale factor. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after scaling, 'pcd_scale_factor' are updated \ in the result dict. """ scale_factor = np.random.uniform(self.scale_ratio_range[0], self.scale_ratio_range[1]) input_dict['pcd_scale_factor'] = scale_factor def __call__(self, input_dict): """Private function to rotate, scale and translate bounding boxes and \ points. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after scaling, 'points', 'pcd_rotation', 'pcd_scale_factor', 'pcd_trans' and keys in \ input_dict['bbox3d_fields'] are updated in the result dict. """ self._rot_bbox_points(input_dict) if 'pcd_scale_factor' not in input_dict: self._random_scale(input_dict) self._scale_bbox_points(input_dict) self._trans_bbox_points(input_dict) return input_dict def __repr__(self): """str: Return a string that describes the module.""" repr_str = self.__class__.__name__ repr_str += '(rot_range={},'.format(self.rot_range) repr_str += ' scale_ratio_range={},'.format(self.scale_ratio_range) repr_str += ' translation_std={})'.format(self.translation_std) repr_str += ' shift_height={})'.format(self.shift_height) return repr_str @PIPELINES.register_module() class PointShuffle(object): """Shuffle input points.""" def __call__(self, input_dict): """Call function to shuffle points. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after filtering, 'points' keys are updated \ in the result dict. """ np.random.shuffle(input_dict['points']) return input_dict def __repr__(self): return self.__class__.__name__ @PIPELINES.register_module() class ObjectRangeFilter(object): """Filter objects by the range. Args: point_cloud_range (list[float]): Point cloud range. """ def __init__(self, point_cloud_range): self.pcd_range = np.array(point_cloud_range, dtype=np.float32) self.bev_range = self.pcd_range[[0, 1, 3, 4]] def __call__(self, input_dict): """Call function to filter objects by the range. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after filtering, 'gt_bboxes_3d', 'gt_labels_3d' \ keys are updated in the result dict. """ gt_bboxes_3d = input_dict['gt_bboxes_3d'] gt_labels_3d = input_dict['gt_labels_3d'] mask = gt_bboxes_3d.in_range_bev(self.bev_range) gt_bboxes_3d = gt_bboxes_3d[mask] # mask is a torch tensor but gt_labels_3d is still numpy array # using mask to index gt_labels_3d will cause bug when # len(gt_labels_3d) == 1, where mask=1 will be interpreted # as gt_labels_3d[1] and cause out of index error gt_labels_3d = gt_labels_3d[mask.numpy().astype(np.bool)] # limit rad to [-pi, pi] gt_bboxes_3d.limit_yaw(offset=0.5, period=2 * np.pi) input_dict['gt_bboxes_3d'] = gt_bboxes_3d input_dict['gt_labels_3d'] = gt_labels_3d return input_dict def __repr__(self): """str: Return a string that describes the module.""" repr_str = self.__class__.__name__ repr_str += '(point_cloud_range={})'.format(self.pcd_range.tolist()) return repr_str @PIPELINES.register_module() class PointsRangeFilter(object): """Filter points by the range. Args: point_cloud_range (list[float]): Point cloud range. """ def __init__(self, point_cloud_range): self.pcd_range = np.array( point_cloud_range, dtype=np.float32)[np.newaxis, :] def __call__(self, input_dict): """Call function to filter points by the range. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after filtering, 'points' keys are updated \ in the result dict. """ points = input_dict['points'] points_mask = ((points[:, :3] >= self.pcd_range[:, :3]) & (points[:, :3] < self.pcd_range[:, 3:])) points_mask = points_mask[:, 0] & points_mask[:, 1] & points_mask[:, 2] clean_points = points[points_mask, :] input_dict['points'] = clean_points return input_dict def __repr__(self): """str: Return a string that describes the module.""" repr_str = self.__class__.__name__ repr_str += '(point_cloud_range={})'.format(self.pcd_range.tolist()) return repr_str @PIPELINES.register_module() class ObjectNameFilter(object): """Filter GT objects by their names. Args: classes (list[str]): List of class names to be kept for training. """ def __init__(self, classes): self.classes = classes self.labels = list(range(len(self.classes))) def __call__(self, input_dict): """Call function to filter objects by their names. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after filtering, 'gt_bboxes_3d', 'gt_labels_3d' \ keys are updated in the result dict. """ gt_labels_3d = input_dict['gt_labels_3d'] gt_bboxes_mask = np.array([n in self.labels for n in gt_labels_3d], dtype=np.bool_) input_dict['gt_bboxes_3d'] = input_dict['gt_bboxes_3d'][gt_bboxes_mask] input_dict['gt_labels_3d'] = input_dict['gt_labels_3d'][gt_bboxes_mask] return input_dict def __repr__(self): """str: Return a string that describes the module.""" repr_str = self.__class__.__name__ repr_str += f'(classes={self.classes})' return repr_str @PIPELINES.register_module() class IndoorPointSample(object): """Indoor point sample. Sampling data to a certain number. Args: name (str): Name of the dataset. num_points (int): Number of points to be sampled. """ def __init__(self, num_points): self.num_points = num_points def points_random_sampling(self, points, num_samples, replace=None, return_choices=False): """Points random sampling. Sample points to a certain number. Args: points (np.ndarray): 3D Points. num_samples (int): Number of samples to be sampled. replace (bool): Whether the sample is with or without replacement. Defaults to None. return_choices (bool): Whether return choice. Defaults to False. Returns: tuple[np.ndarray] | np.ndarray: - points (np.ndarray): 3D Points. - choices (np.ndarray, optional): The generated random samples. """ if replace is None: replace = (points.shape[0] < num_samples) choices = np.random.choice( points.shape[0], num_samples, replace=replace) if return_choices: return points[choices], choices else: return points[choices] def __call__(self, results): """Call function to sample points to in indoor scenes. Args: input_dict (dict): Result dict from loading pipeline. Returns: dict: Results after sampling, 'points', 'pts_instance_mask' \ and 'pts_semantic_mask' keys are updated in the result dict. """ points = results['points'] points, choices = self.points_random_sampling( points, self.num_points, return_choices=True) pts_instance_mask = results.get('pts_instance_mask', None) pts_semantic_mask = results.get('pts_semantic_mask', None) results['points'] = points if pts_instance_mask is not None and pts_semantic_mask is not None: pts_instance_mask = pts_instance_mask[choices] pts_semantic_mask = pts_semantic_mask[choices] results['pts_instance_mask'] = pts_instance_mask results['pts_semantic_mask'] = pts_semantic_mask return results def __repr__(self): """str: Return a string that describes the module.""" repr_str = self.__class__.__name__ repr_str += '(num_points={})'.format(self.num_points) return repr_str