import numpy as np from mmcv.utils import build_from_cfg from mmdet3d.core.bbox import box_np_ops from mmdet.datasets.builder import PIPELINES from mmdet.datasets.pipelines import RandomFlip from ..registry import OBJECTSAMPLERS from .data_augment_utils import noise_per_object_v3_ @PIPELINES.register_module() class RandomFlip3D(RandomFlip): """Flip the points & bbox. If the input dict contains the key "flip", then the flag will be used, otherwise it will be randomly decided by a ratio specified in the init method. Args: sync_2d (bool, optional): Whether to apply flip according to the 2D images. If True, it will apply the same flip as that to 2D images. If False, it will decide whether to flip randomly and independently to that of 2D images. flip_ratio_bev_horizontal (float, optional): The flipping probability in horizontal direction. flip_ratio_bev_vertical (float, optional): The flipping probability in vertical direction. """ def __init__(self, sync_2d=True, flip_ratio_bev_horizontal=0.0, flip_ratio_bev_vertical=0.0, **kwargs): super(RandomFlip3D, self).__init__( flip_ratio=flip_ratio_bev_horizontal, **kwargs) self.sync_2d = sync_2d self.flip_ratio_bev_vertical = flip_ratio_bev_vertical if flip_ratio_bev_horizontal is not None: assert isinstance( flip_ratio_bev_horizontal, (int, float)) and 0 <= flip_ratio_bev_horizontal <= 1 if flip_ratio_bev_vertical is not None: assert isinstance( flip_ratio_bev_vertical, (int, float)) and 0 <= flip_ratio_bev_vertical <= 1 def random_flip_data_3d(self, input_dict, direction='horizontal'): assert direction in ['horizontal', 'vertical'] if len(input_dict['bbox3d_fields']) == 0: # test mode input_dict['bbox3d_fields'].append('empty_box3d') input_dict['empty_box3d'] = input_dict['box_type_3d']( np.array([], dtype=np.float32)) assert len(input_dict['bbox3d_fields']) == 1 for key in input_dict['bbox3d_fields']: input_dict['points'] = input_dict[key].flip( direction, points=input_dict['points']) def __call__(self, input_dict): # filp 2D image and its annotations super(RandomFlip3D, self).__call__(input_dict) if self.sync_2d: input_dict['pcd_horizontal_flip'] = input_dict['flip'] input_dict['pcd_vertical_flip'] = False else: if 'pcd_horizontal_flip' not in input_dict: flip_horizontal = True if np.random.rand( ) < self.flip_ratio else False input_dict['pcd_horizontal_flip'] = flip_horizontal if 'pcd_vertical_flip' not in input_dict: flip_vertical = True if np.random.rand( ) < self.flip_ratio_bev_vertical else False input_dict['pcd_vertical_flip'] = flip_vertical if input_dict['pcd_horizontal_flip']: self.random_flip_data_3d(input_dict, 'horizontal') if input_dict['pcd_vertical_flip']: self.random_flip_data_3d(input_dict, 'vertical') return input_dict def __repr__(self): repr_str = self.__class__.__name__ repr_str += '(sync_2d={},'.format(self.sync_2d) repr_str += '(flip_ratio_bev_horizontal={},'.format( self.flip_ratio_bev_horizontal) repr_str += '(flip_ratio_bev_vertical={},'.format( self.flip_ratio_bev_vertical) return repr_str @PIPELINES.register_module() class ObjectSample(object): """Sample GT objects to the data. Args: db_sampler (dict): Config dict of the database sampler. sample_2d (bool): Whether to also paste 2D image patch to the images This should be true when applying multi-modality cut-and-paste. """ def __init__(self, db_sampler, sample_2d=False): self.sampler_cfg = db_sampler self.sample_2d = sample_2d if 'type' not in db_sampler.keys(): db_sampler['type'] = 'DataBaseSampler' self.db_sampler = build_from_cfg(db_sampler, OBJECTSAMPLERS) @staticmethod def remove_points_in_boxes(points, boxes): masks = box_np_ops.points_in_rbbox(points, boxes) points = points[np.logical_not(masks.any(-1))] return points def __call__(self, input_dict): gt_bboxes_3d = input_dict['gt_bboxes_3d'] gt_labels_3d = input_dict['gt_labels_3d'] # change to float for blending operation points = input_dict['points'] if self.sample_2d: img = input_dict['img'] gt_bboxes_2d = input_dict['gt_bboxes'] # Assume for now 3D & 2D bboxes are the same sampled_dict = self.db_sampler.sample_all( gt_bboxes_3d.tensor.numpy(), gt_labels_3d, gt_bboxes_2d=gt_bboxes_2d, img=img) else: sampled_dict = self.db_sampler.sample_all( gt_bboxes_3d.tensor.numpy(), gt_labels_3d, img=None) if sampled_dict is not None: sampled_gt_bboxes_3d = sampled_dict['gt_bboxes_3d'] sampled_points = sampled_dict['points'] sampled_gt_labels = sampled_dict['gt_labels_3d'] gt_labels_3d = np.concatenate([gt_labels_3d, sampled_gt_labels], axis=0) gt_bboxes_3d = gt_bboxes_3d.new_box( np.concatenate( [gt_bboxes_3d.tensor.numpy(), sampled_gt_bboxes_3d])) points = self.remove_points_in_boxes(points, sampled_gt_bboxes_3d) # check the points dimension dim_inds = points.shape[-1] points = np.concatenate([sampled_points[:, :dim_inds], points], axis=0) if self.sample_2d: sampled_gt_bboxes_2d = sampled_dict['gt_bboxes_2d'] gt_bboxes_2d = np.concatenate( [gt_bboxes_2d, sampled_gt_bboxes_2d]).astype(np.float32) input_dict['gt_bboxes'] = gt_bboxes_2d input_dict['img'] = sampled_dict['img'] input_dict['gt_bboxes_3d'] = gt_bboxes_3d input_dict['gt_labels_3d'] = gt_labels_3d input_dict['points'] = points return input_dict def __repr__(self): return self.__class__.__name__ @PIPELINES.register_module() class ObjectNoise(object): """Apply noise to each GT objects in the scene. Args: translation_std (list, optional): Standard deviation of the distribution where translation noise are sampled from. Defaults to [0.25, 0.25, 0.25]. global_rot_range (list, optional): Global rotation to the scene. Defaults to [0.0, 0.0]. rot_range (list, optional): Object rotation range. Defaults to [-0.15707963267, 0.15707963267]. num_try (int, optional): Number of times to try if the noise applied is invalid. Defaults to 100. """ def __init__(self, translation_std=[0.25, 0.25, 0.25], global_rot_range=[0.0, 0.0], rot_range=[-0.15707963267, 0.15707963267], num_try=100): self.translation_std = translation_std self.global_rot_range = global_rot_range self.rot_range = rot_range self.num_try = num_try def __call__(self, input_dict): gt_bboxes_3d = input_dict['gt_bboxes_3d'] points = input_dict['points'] # TODO: check this inplace function numpy_box = gt_bboxes_3d.tensor.numpy() noise_per_object_v3_( numpy_box, points, rotation_perturb=self.rot_range, center_noise_std=self.translation_std, global_random_rot_range=self.global_rot_range, num_try=self.num_try) input_dict['gt_bboxes_3d'] = gt_bboxes_3d.new_box(numpy_box) input_dict['points'] = points return input_dict def __repr__(self): repr_str = self.__class__.__name__ repr_str += '(num_try={},'.format(self.num_try) repr_str += ' translation_std={},'.format(self.translation_std) repr_str += ' global_rot_range={},'.format(self.global_rot_range) repr_str += ' rot_range={})'.format(self.rot_range) return repr_str @PIPELINES.register_module() class GlobalRotScaleTrans(object): """Apply global rotation, scaling and translation to a 3D scene. Args: rot_range (list[float]): Range of rotation angle. Default to [-0.78539816, 0.78539816] (close to [-pi/4, pi/4]). scale_ratio_range (list[float]): Range of scale ratio. Default to [0.95, 1.05]. translation_std (list[float]): The standard deviation of ranslation noise. This apply random translation to a scene by a noise, which is sampled from a gaussian distribution whose standard deviation is set by ``translation_std``. Default to [0, 0, 0] shift_height (bool): whether to shift height (the fourth dimension of indoor points) when scaling. """ def __init__(self, rot_range=[-0.78539816, 0.78539816], scale_ratio_range=[0.95, 1.05], translation_std=[0, 0, 0], shift_height=False): self.rot_range = rot_range self.scale_ratio_range = scale_ratio_range self.translation_std = translation_std self.shift_height = shift_height def _trans_bbox_points(self, input_dict): if not isinstance(self.translation_std, (list, tuple, np.ndarray)): translation_std = [ self.translation_std, self.translation_std, self.translation_std ] else: translation_std = self.translation_std translation_std = np.array(translation_std, dtype=np.float32) trans_factor = np.random.normal(scale=translation_std, size=3).T input_dict['points'][:, :3] += trans_factor input_dict['pcd_trans'] = trans_factor for key in input_dict['bbox3d_fields']: input_dict[key].translate(trans_factor) def _rot_bbox_points(self, input_dict): rotation = self.rot_range if not isinstance(rotation, list): rotation = [-rotation, rotation] noise_rotation = np.random.uniform(rotation[0], rotation[1]) for key in input_dict['bbox3d_fields']: if len(input_dict[key].tensor) != 0: points, rot_mat_T = input_dict[key].rotate( noise_rotation, input_dict['points']) input_dict['points'] = points input_dict['pcd_rotation'] = rot_mat_T def _scale_bbox_points(self, input_dict): scale = input_dict['pcd_scale_factor'] input_dict['points'][:, :3] *= scale if self.shift_height: input_dict['points'][:, -1] *= scale for key in input_dict['bbox3d_fields']: input_dict[key].scale(scale) def _random_scale(self, input_dict): scale_factor = np.random.uniform(self.scale_ratio_range[0], self.scale_ratio_range[1]) input_dict['pcd_scale_factor'] = scale_factor def __call__(self, input_dict): self._rot_bbox_points(input_dict) if 'pcd_scale_factor' not in input_dict: self._random_scale(input_dict) self._scale_bbox_points(input_dict) self._trans_bbox_points(input_dict) return input_dict def __repr__(self): repr_str = self.__class__.__name__ repr_str += '(rot_range={},'.format(self.rot_range) repr_str += ' scale_ratio_range={},'.format(self.scale_ratio_range) repr_str += ' translation_std={})'.format(self.translation_std) repr_str += ' shift_height={})'.format(self.shift_height) return repr_str @PIPELINES.register_module() class PointShuffle(object): def __call__(self, input_dict): np.random.shuffle(input_dict['points']) return input_dict def __repr__(self): return self.__class__.__name__ @PIPELINES.register_module() class ObjectRangeFilter(object): def __init__(self, point_cloud_range): self.pcd_range = np.array(point_cloud_range, dtype=np.float32) self.bev_range = self.pcd_range[[0, 1, 3, 4]] def __call__(self, input_dict): gt_bboxes_3d = input_dict['gt_bboxes_3d'] gt_labels_3d = input_dict['gt_labels_3d'] mask = gt_bboxes_3d.in_range_bev(self.bev_range) gt_bboxes_3d = gt_bboxes_3d[mask] # mask is a torch tensor but gt_labels_3d is still numpy array # using mask to index gt_labels_3d will cause bug when # len(gt_labels_3d) == 1, where mask=1 will be interpreted # as gt_labels_3d[1] and cause out of index error gt_labels_3d = gt_labels_3d[mask.numpy().astype(np.bool)] # limit rad to [-pi, pi] gt_bboxes_3d.limit_yaw(offset=0.5, period=2 * np.pi) input_dict['gt_bboxes_3d'] = gt_bboxes_3d input_dict['gt_labels_3d'] = gt_labels_3d return input_dict def __repr__(self): repr_str = self.__class__.__name__ repr_str += '(point_cloud_range={})'.format(self.pcd_range.tolist()) return repr_str @PIPELINES.register_module() class PointsRangeFilter(object): def __init__(self, point_cloud_range): self.pcd_range = np.array( point_cloud_range, dtype=np.float32)[np.newaxis, :] def __call__(self, input_dict): points = input_dict['points'] points_mask = ((points[:, :3] >= self.pcd_range[:, :3]) & (points[:, :3] < self.pcd_range[:, 3:])) points_mask = points_mask[:, 0] & points_mask[:, 1] & points_mask[:, 2] clean_points = points[points_mask, :] input_dict['points'] = clean_points return input_dict def __repr__(self): repr_str = self.__class__.__name__ repr_str += '(point_cloud_range={})'.format(self.pcd_range.tolist()) return repr_str @PIPELINES.register_module() class ObjectNameFilter(object): """Filter GT objects by their names. Args: classes (list[str]): list of class names to be kept for training """ def __init__(self, classes): self.classes = classes self.labels = list(range(len(self.classes))) def __call__(self, input_dict): gt_labels_3d = input_dict['gt_labels_3d'] gt_bboxes_mask = np.array([n in self.labels for n in gt_labels_3d], dtype=np.bool_) input_dict['gt_bboxes_3d'] = input_dict['gt_bboxes_3d'][gt_bboxes_mask] input_dict['gt_labels_3d'] = input_dict['gt_labels_3d'][gt_bboxes_mask] return input_dict def __repr__(self): repr_str = self.__class__.__name__ repr_str += f'(classes={self.classes})' return repr_str @PIPELINES.register_module() class IndoorPointSample(object): """Indoor point sample. Sampling data to a certain number. Args: name (str): Name of the dataset. num_points (int): Number of points to be sampled. """ def __init__(self, num_points): self.num_points = num_points def points_random_sampling(self, points, num_samples, replace=None, return_choices=False): """Points random sampling. Sample points to a certain number. Args: points (ndarray): 3D Points. num_samples (int): Number of samples to be sampled. replace (bool): Whether the sample is with or without replacement. return_choices (bool): Whether return choice. Returns: points (ndarray): 3D Points. choices (ndarray): The generated random samples. """ if replace is None: replace = (points.shape[0] < num_samples) choices = np.random.choice( points.shape[0], num_samples, replace=replace) if return_choices: return points[choices], choices else: return points[choices] def __call__(self, results): points = results['points'] points, choices = self.points_random_sampling( points, self.num_points, return_choices=True) pts_instance_mask = results.get('pts_instance_mask', None) pts_semantic_mask = results.get('pts_semantic_mask', None) results['points'] = points if pts_instance_mask is not None and pts_semantic_mask is not None: pts_instance_mask = pts_instance_mask[choices] pts_semantic_mask = pts_semantic_mask[choices] results['pts_instance_mask'] = pts_instance_mask results['pts_semantic_mask'] = pts_semantic_mask return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += '(num_points={})'.format(self.num_points) return repr_str