from functools import partial import numpy as np from ...utils import box_utils, common_utils class DataProcessor(object): def __init__(self, processor_configs, point_cloud_range, training): self.point_cloud_range = point_cloud_range self.training = training self.mode = 'train' if training else 'test' self.grid_size = self.voxel_size = None self.data_processor_queue = [] for cur_cfg in processor_configs: cur_processor = getattr(self, cur_cfg.NAME)(config=cur_cfg) self.data_processor_queue.append(cur_processor) def mask_points_and_boxes_outside_range(self, data_dict=None, config=None): if data_dict is None: return partial(self.mask_points_and_boxes_outside_range, config=config) mask = common_utils.mask_points_by_range(data_dict['points'], self.point_cloud_range) data_dict['points'] = data_dict['points'][mask] if data_dict.get('gt_boxes', None) is not None and config.REMOVE_OUTSIDE_BOXES and self.training: mask = box_utils.mask_boxes_outside_range_numpy( data_dict['gt_boxes'], self.point_cloud_range, min_num_corners=config.get('min_num_corners', 1) ) data_dict['gt_boxes'] = data_dict['gt_boxes'][mask] return data_dict def shuffle_points(self, data_dict=None, config=None): if data_dict is None: return partial(self.shuffle_points, config=config) if config.SHUFFLE_ENABLED[self.mode]: points = data_dict['points'] shuffle_idx = np.random.permutation(points.shape[0]) points = points[shuffle_idx] data_dict['points'] = points return data_dict def transform_points_to_voxels(self, data_dict=None, config=None, voxel_generator=None): if data_dict is None: from spconv.utils import VoxelGenerator voxel_generator = VoxelGenerator( voxel_size=config.VOXEL_SIZE, point_cloud_range=self.point_cloud_range, max_num_points=config.MAX_POINTS_PER_VOXEL, max_voxels=config.MAX_NUMBER_OF_VOXELS[self.mode] ) grid_size = (self.point_cloud_range[3:6] - self.point_cloud_range[0:3]) / np.array(config.VOXEL_SIZE) self.grid_size = np.round(grid_size).astype(np.int64) self.voxel_size = config.VOXEL_SIZE return partial(self.transform_points_to_voxels, voxel_generator=voxel_generator) points = data_dict['points'] voxels, coordinates, num_points = voxel_generator.generate(points) if not data_dict['use_lead_xyz']: voxels = voxels[..., 3:] # remove xyz in voxels(N, 3) data_dict['voxels'] = voxels data_dict['voxel_coords'] = coordinates data_dict['voxel_num_points'] = num_points return data_dict def forward(self, data_dict): """ Args: data_dict: points: (N, 3 + C_in) gt_boxes: optional, (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...] gt_names: optional, (N), string ... Returns: """ for cur_processor in self.data_processor_queue: data_dict = cur_processor(data_dict=data_dict) return data_dict