import mmcv import numpy as np from mmdet.datasets.builder import PIPELINES from mmdet.datasets.pipelines import LoadAnnotations @PIPELINES.register_module() class LoadMultiViewImageFromFiles(object): """Load multi channel images from a list of separate channel files. Expects results['img_filename'] to be a list of filenames """ def __init__(self, to_float32=False, color_type='unchanged'): self.to_float32 = to_float32 self.color_type = color_type def __call__(self, results): filename = results['img_filename'] img = np.stack( [mmcv.imread(name, self.color_type) for name in filename], axis=-1) if self.to_float32: img = img.astype(np.float32) results['filename'] = filename results['img'] = img results['img_shape'] = img.shape results['ori_shape'] = img.shape # Set initial values for default meta_keys results['pad_shape'] = img.shape results['scale_factor'] = 1.0 num_channels = 1 if len(img.shape) < 3 else img.shape[2] results['img_norm_cfg'] = dict( mean=np.zeros(num_channels, dtype=np.float32), std=np.ones(num_channels, dtype=np.float32), to_rgb=False) return results def __repr__(self): return "{} (to_float32={}, color_type='{}')".format( self.__class__.__name__, self.to_float32, self.color_type) @PIPELINES.register_module() class LoadPointsFromMultiSweeps(object): """Load points from multiple sweeps. This is usually used for nuScenes dataset to utilize previous sweeps. Args: sweeps_num (int): number of sweeps load_dim (int): dimension number of the loaded points file_client_args (dict): Config dict of file clients, refer to https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py for more details. """ def __init__(self, sweeps_num=10, load_dim=5, file_client_args=dict(backend='disk')): self.load_dim = load_dim self.sweeps_num = sweeps_num self.file_client_args = file_client_args.copy() self.file_client = None def _load_points(self, pts_filename): if self.file_client is None: self.file_client = mmcv.FileClient(**self.file_client_args) try: pts_bytes = self.file_client.get(pts_filename) points = np.frombuffer(pts_bytes, dtype=np.float32) except ConnectionError: mmcv.check_file_exist(pts_filename) if pts_filename.endswith('.npy'): points = np.load(pts_filename) else: points = np.fromfile(pts_filename, dtype=np.float32) return points def __call__(self, results): points = results['points'] points[:, 3] /= 255 points[:, 4] = 0 sweep_points_list = [points] ts = results['timestamp'] for idx, sweep in enumerate(results['sweeps']): if idx >= self.sweeps_num: break points_sweep = self._load_points(sweep['data_path']) points_sweep = np.copy(points_sweep).reshape(-1, self.load_dim) sweep_ts = sweep['timestamp'] / 1e6 points_sweep[:, 3] /= 255 points_sweep[:, :3] = points_sweep[:, :3] @ sweep[ 'sensor2lidar_rotation'].T points_sweep[:, :3] += sweep['sensor2lidar_translation'] points_sweep[:, 4] = ts - sweep_ts sweep_points_list.append(points_sweep) points = np.concatenate(sweep_points_list, axis=0)[:, [0, 1, 2, 4]] results['points'] = points return results def __repr__(self): return f'{self.__class__.__name__}(sweeps_num={self.sweeps_num})' @PIPELINES.register_module() class PointSegClassMapping(object): """Map original semantic class to valid category ids. Map valid classes as 0~len(valid_cat_ids)-1 and others as len(valid_cat_ids). Args: valid_cat_ids (tuple[int): A tuple of valid category. """ def __init__(self, valid_cat_ids): self.valid_cat_ids = valid_cat_ids def __call__(self, results): assert 'pts_semantic_mask' in results pts_semantic_mask = results['pts_semantic_mask'] neg_cls = len(self.valid_cat_ids) for i in range(pts_semantic_mask.shape[0]): if pts_semantic_mask[i] in self.valid_cat_ids: converted_id = self.valid_cat_ids.index(pts_semantic_mask[i]) pts_semantic_mask[i] = converted_id else: pts_semantic_mask[i] = neg_cls results['pts_semantic_mask'] = pts_semantic_mask return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += '(valid_cat_ids={})'.format(self.valid_cat_ids) return repr_str @PIPELINES.register_module() class NormalizePointsColor(object): """Normalize color of points. Normalize color of the points. Args: color_mean (list[float]): Mean color of the point cloud. """ def __init__(self, color_mean): self.color_mean = color_mean def __call__(self, results): points = results['points'] assert points.shape[1] >= 6,\ f'Expect points have channel >=6, got {points.shape[1]}' points[:, 3:6] = points[:, 3:6] - np.array(self.color_mean) / 256.0 results['points'] = points return results def __repr__(self): repr_str = self.__class__.__name__ repr_str += '(color_mean={})'.format(self.color_mean) return repr_str @PIPELINES.register_module() class LoadPointsFromFile(object): """Load Points From File. Load sunrgbd and scannet points from file. Args: shift_height (bool): Whether to use shifted height. load_dim (int): The dimension of the loaded points. Default: 6. use_dim (list[int]): Which dimensions of the points to be used. Default: [0, 1, 2]. For KITTI dataset, set use_dim=4 or use_dim=[0, 1, 2, 3] to use the intensity dimension file_client_args (dict): Config dict of file clients, refer to https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py for more details. """ def __init__(self, load_dim=6, use_dim=[0, 1, 2], shift_height=False, file_client_args=dict(backend='disk')): self.shift_height = shift_height if isinstance(use_dim, int): use_dim = list(range(use_dim)) assert max(use_dim) < load_dim, \ f'Expect all used dimensions < {load_dim}, got {use_dim}' self.load_dim = load_dim self.use_dim = use_dim self.file_client_args = file_client_args.copy() self.file_client = None def _load_points(self, pts_filename): if self.file_client is None: self.file_client = mmcv.FileClient(**self.file_client_args) try: pts_bytes = self.file_client.get(pts_filename) points = np.frombuffer(pts_bytes, dtype=np.float32) except ConnectionError: mmcv.check_file_exist(pts_filename) if pts_filename.endswith('.npy'): points = np.load(pts_filename) else: points = np.fromfile(pts_filename, dtype=np.float32) return points def __call__(self, results): pts_filename = results['pts_filename'] points = self._load_points(pts_filename) points = points.reshape(-1, self.load_dim) points = points[:, self.use_dim] if self.shift_height: floor_height = np.percentile(points[:, 2], 0.99) height = points[:, 2] - floor_height points = np.concatenate([points, np.expand_dims(height, 1)], 1) results['points'] = points return results def __repr__(self): repr_str = self.__class__.__name__ + '(' repr_str += 'shift_height={}, '.format(self.shift_height) repr_str += 'file_client_args={}), '.format(self.file_client_args) repr_str += 'load_dim={}, '.format(self.load_dim) repr_str += 'use_dim={})'.format(self.use_dim) return repr_str @PIPELINES.register_module() class LoadAnnotations3D(LoadAnnotations): """Load Annotations3D. Load instance mask and semantic mask of points and encapsulate the items into related fields. Args: with_bbox_3d (bool, optional): Whether to load 3D boxes. Defaults to True. with_label_3d (bool, optional): Whether to load 3D labels. Defaults to True. with_mask_3d (bool, optional): Whether to load 3D instance masks. for points. Defaults to False. with_seg_3d (bool, optional): Whether to load 3D semantic masks. for points. Defaults to False. with_bbox (bool, optional): Whether to load 2D boxes. Defaults to False. with_label (bool, optional): Whether to load 2D labels. Defaults to False. with_mask (bool, optional): Whether to load 2D instance masks. Defaults to False. with_seg (bool, optional): Whether to load 2D semantic masks. Defaults to False. poly2mask (bool, optional): Whether to convert polygon annotations to bitmasks. Defaults to True. file_client_args (dict): Config dict of file clients, refer to https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py for more details. """ def __init__(self, with_bbox_3d=True, with_label_3d=True, with_mask_3d=False, with_seg_3d=False, with_bbox=False, with_label=False, with_mask=False, with_seg=False, poly2mask=True, file_client_args=dict(backend='disk')): super().__init__( with_bbox, with_label, with_mask, with_seg, poly2mask, file_client_args=file_client_args) self.with_bbox_3d = with_bbox_3d self.with_label_3d = with_label_3d self.with_mask_3d = with_mask_3d self.with_seg_3d = with_seg_3d def _load_bboxes_3d(self, results): results['gt_bboxes_3d'] = results['ann_info']['gt_bboxes_3d'] results['bbox3d_fields'].append('gt_bboxes_3d') return results def _load_labels_3d(self, results): results['gt_labels_3d'] = results['ann_info']['gt_labels_3d'] return results def _load_masks_3d(self, results): pts_instance_mask_path = results['ann_info']['pts_instance_mask_path'] if self.file_client is None: self.file_client = mmcv.FileClient(**self.file_client_args) try: mask_bytes = self.file_client.get(pts_instance_mask_path) pts_instance_mask = np.frombuffer(mask_bytes, dtype=np.int) except ConnectionError: mmcv.check_file_exist(pts_instance_mask_path) pts_instance_mask = np.fromfile( pts_instance_mask_path, dtype=np.long) results['pts_instance_mask'] = pts_instance_mask results['pts_mask_fields'].append('pts_instance_mask') return results def _load_semantic_seg_3d(self, results): pts_semantic_mask_path = results['ann_info']['pts_semantic_mask_path'] if self.file_client is None: self.file_client = mmcv.FileClient(**self.file_client_args) try: mask_bytes = self.file_client.get(pts_semantic_mask_path) # add .copy() to fix read-only bug pts_semantic_mask = np.frombuffer(mask_bytes, dtype=np.int).copy() except ConnectionError: mmcv.check_file_exist(pts_semantic_mask_path) pts_semantic_mask = np.fromfile( pts_semantic_mask_path, dtype=np.long) results['pts_semantic_mask'] = pts_semantic_mask results['pts_seg_fields'].append('pts_semantic_mask') return results def __call__(self, results): results = super().__call__(results) if self.with_bbox_3d: results = self._load_bboxes_3d(results) if results is None: return None if self.with_label_3d: results = self._load_labels_3d(results) if self.with_mask_3d: results = self._load_masks_3d(results) if self.with_seg_3d: results = self._load_semantic_seg_3d(results) return results def __repr__(self): indent_str = ' ' repr_str = self.__class__.__name__ + '(\n' repr_str += f'{indent_str}with_bbox_3d={self.with_bbox_3d}, ' repr_str += f'{indent_str}with_label_3d={self.with_label_3d}, ' repr_str += f'{indent_str}with_mask_3d={self.with_mask_3d}, ' repr_str += f'{indent_str}with_seg_3d={self.with_seg_3d}, ' repr_str += f'{indent_str}with_bbox={self.with_bbox}, ' repr_str += f'{indent_str}with_label={self.with_label}, ' repr_str += f'{indent_str}with_mask={self.with_mask}, ' repr_str += f'{indent_str}with_seg={self.with_seg}, ' repr_str += f'{indent_str}poly2mask={self.poly2mask})' return repr_str