# Copyright (c) OpenMMLab. All rights reserved. import copy import tempfile from os import path as osp from typing import Callable, List, Optional, Union import mmcv import numpy as np import torch from mmcv.utils import print_log from mmdet3d.datasets import DATASETS from ..core import show_multi_modality_result, show_result from ..core.bbox import (Box3DMode, CameraInstance3DBoxes, Coord3DMode, LiDARInstance3DBoxes, points_cam2img) from .det3d_dataset import Det3DDataset from .pipelines import Compose @DATASETS.register_module() class KittiDataset(Det3DDataset): r"""KITTI Dataset. This class serves as the API for experiments on the `KITTI Dataset `_. Args: data_root (str): Path of dataset root. ann_file (str): Path of annotation file. pipeline (list[dict], optional): Pipeline used for data processing. Defaults to None. modality (dict, optional): Modality to specify the sensor data used as input. Defaults to `dict(use_lidar=True)`. box_type_3d (str, optional): Type of 3D box of this dataset. Based on the `box_type_3d`, the dataset will encapsulate the box to its original format then converted them to `box_type_3d`. Defaults to 'LiDAR' in this dataset. Available options includes - 'LiDAR': Box in LiDAR coordinates. - 'Depth': Box in depth coordinates, usually for indoor dataset. - 'Camera': Box in camera coordinates. filter_empty_gt (bool, optional): Whether to filter empty GT. Defaults to True. test_mode (bool, optional): Whether the dataset is in test mode. Defaults to False. pcd_limit_range (list, optional): The range of point cloud used to filter invalid predicted boxes. Default: [0, -40, -3, 70.4, 40, 0.0]. """ # TODO: use full classes of kitti METAINFO = {'CLASSES': ('Pedestrian', 'Cyclist', 'Car')} def __init__(self, data_root: str, ann_file: str, pipeline: List[Union[dict, Callable]] = [], modality: Optional[dict] = dict(use_lidar=True), box_type_3d: str = 'LiDAR', filter_empty_gt: bool = True, test_mode: bool = False, pcd_limit_range: List[float] = [0, -40, -3, 70.4, 40, 0.0], **kwargs): self.pcd_limit_range = pcd_limit_range super().__init__( data_root=data_root, ann_file=ann_file, pipeline=pipeline, modality=modality, box_type_3d=box_type_3d, filter_empty_gt=filter_empty_gt, test_mode=test_mode, **kwargs) assert self.modality is not None assert box_type_3d.lower() in ('lidar', 'camera') def parse_data_info(self, info: dict) -> dict: """Process the raw data info. The only difference with it in `Det3DDataset` is the specific process for `plane`. Args: info (dict): Raw info dict. Returns: dict: Has `ann_info` in training stage. And all path has been converted to absolute path. """ if self.modality['use_lidar']: if 'plane' in info: # convert ground plane to velodyne coordinates plane = np.array(info['plane']) lidar2cam = np.array(info['lidar_points']['lidar2cam']) reverse = np.linalg.inv(lidar2cam) (plane_norm_cam, plane_off_cam) = (plane[:3], -plane[:3] * plane[3]) plane_norm_lidar = \ (reverse[:3, :3] @ plane_norm_cam[:, None])[:, 0] plane_off_lidar = ( reverse[:3, :3] @ plane_off_cam[:, None][:, 0] + reverse[:3, 3]) plane_lidar = np.zeros_like(plane_norm_lidar, shape=(4, )) plane_lidar[:3] = plane_norm_lidar plane_lidar[3] = -plane_norm_lidar.T @ plane_off_lidar else: plane_lidar = None info['plane'] = plane_lidar info = super().parse_data_info(info) return info def parse_ann_info(self, info): """Get annotation info according to the given index. Args: info (dict): Data information of single data sample. Returns: dict: annotation information consists of the following keys: - bboxes_3d (:obj:`LiDARInstance3DBoxes`): 3D ground truth bboxes. - bbox_labels_3d (np.ndarray): Labels of ground truths. - gt_bboxes (np.ndarray): 2D ground truth bboxes. - gt_labels (np.ndarray): Labels of ground truths. - difficulty (int): Difficulty defined by KITTI. 0, 1, 2 represent xxxxx respectively. """ ann_info = super().parse_ann_info(info) bbox_labels_3d = ann_info['gt_labels_3d'] bbox_labels_3d = np.array(bbox_labels_3d) ann_info['gt_labels_3d'] = bbox_labels_3d ann_info['gt_labels'] = copy.deepcopy(ann_info['gt_labels_3d']) ann_info = self._remove_dontcare(ann_info) # in kitti, lidar2cam = R0_rect @ Tr_velo_to_cam lidar2cam = np.array(info['images']['CAM2']['lidar2cam']) # convert gt_bboxes_3d to velodyne coordinates with `lidar2cam` gt_bboxes_3d = CameraInstance3DBoxes( ann_info['gt_bboxes_3d']).convert_to(self.box_mode_3d, np.linalg.inv(lidar2cam)) ann_info['gt_bboxes_3d'] = gt_bboxes_3d return ann_info def format_results(self, outputs, pklfile_prefix=None, submission_prefix=None): """Format the results to pkl file. Args: outputs (list[dict]): Testing results of the dataset. pklfile_prefix (str): The prefix of pkl files. It includes the file path and the prefix of filename, e.g., "a/b/prefix". If not specified, a temp file will be created. Default: None. submission_prefix (str): The prefix of submitted files. It includes the file path and the prefix of filename, e.g., "a/b/prefix". If not specified, a temp file will be created. Default: None. Returns: tuple: (result_files, tmp_dir), result_files is a dict containing the json filepaths, tmp_dir is the temporal directory created for saving json files when jsonfile_prefix is not specified. """ if pklfile_prefix is None: tmp_dir = tempfile.TemporaryDirectory() pklfile_prefix = osp.join(tmp_dir.name, 'results') else: tmp_dir = None if not isinstance(outputs[0], dict): result_files = self.bbox2result_kitti2d(outputs, self.CLASSES, pklfile_prefix, submission_prefix) elif 'pts_bbox' in outputs[0] or 'img_bbox' in outputs[0]: result_files = dict() for name in outputs[0]: results_ = [out[name] for out in outputs] pklfile_prefix_ = pklfile_prefix + name if submission_prefix is not None: submission_prefix_ = submission_prefix + name else: submission_prefix_ = None if 'img' in name: result_files = self.bbox2result_kitti2d( results_, self.CLASSES, pklfile_prefix_, submission_prefix_) else: result_files_ = self.bbox2result_kitti( results_, self.CLASSES, pklfile_prefix_, submission_prefix_) result_files[name] = result_files_ else: result_files = self.bbox2result_kitti(outputs, self.CLASSES, pklfile_prefix, submission_prefix) return result_files, tmp_dir def evaluate(self, results, metric=None, logger=None, pklfile_prefix=None, submission_prefix=None, show=False, out_dir=None, pipeline=None): """Evaluation in KITTI protocol. Args: results (list[dict]): Testing results of the dataset. metric (str | list[str], optional): Metrics to be evaluated. Default: None. logger (logging.Logger | str, optional): Logger used for printing related information during evaluation. Default: None. pklfile_prefix (str, optional): The prefix of pkl files, including the file path and the prefix of filename, e.g., "a/b/prefix". If not specified, a temp file will be created. Default: None. submission_prefix (str, optional): The prefix of submission data. If not specified, the submission data will not be generated. Default: None. show (bool, optional): Whether to visualize. Default: False. out_dir (str, optional): Path to save the visualization results. Default: None. pipeline (list[dict], optional): raw data loading for showing. Default: None. Returns: dict[str, float]: Results of each evaluation metric. """ result_files, tmp_dir = self.format_results(results, pklfile_prefix) from mmdet3d.core.evaluation import kitti_eval gt_annos = [info['annos'] for info in self.data_infos] if isinstance(result_files, dict): ap_dict = dict() for name, result_files_ in result_files.items(): eval_types = ['bbox', 'bev', '3d'] if 'img' in name: eval_types = ['bbox'] ap_result_str, ap_dict_ = kitti_eval( gt_annos, result_files_, self.CLASSES, eval_types=eval_types) for ap_type, ap in ap_dict_.items(): ap_dict[f'{name}/{ap_type}'] = float('{:.4f}'.format(ap)) print_log( f'Results of {name}:\n' + ap_result_str, logger=logger) else: if metric == 'img_bbox': ap_result_str, ap_dict = kitti_eval( gt_annos, result_files, self.CLASSES, eval_types=['bbox']) else: ap_result_str, ap_dict = kitti_eval(gt_annos, result_files, self.CLASSES) print_log('\n' + ap_result_str, logger=logger) if tmp_dir is not None: tmp_dir.cleanup() if show or out_dir: self.show(results, out_dir, show=show, pipeline=pipeline) return ap_dict def bbox2result_kitti(self, net_outputs, class_names, pklfile_prefix=None, submission_prefix=None): """Convert 3D detection results to kitti format for evaluation and test submission. Args: net_outputs (list[np.ndarray]): List of array storing the inferenced bounding boxes and scores. class_names (list[String]): A list of class names. pklfile_prefix (str): The prefix of pkl file. submission_prefix (str): The prefix of submission file. Returns: list[dict]: A list of dictionaries with the kitti format. """ assert len(net_outputs) == len(self.data_infos), \ 'invalid list length of network outputs' if submission_prefix is not None: mmcv.mkdir_or_exist(submission_prefix) det_annos = [] print('\nConverting prediction to KITTI format') for idx, pred_dicts in enumerate( mmcv.track_iter_progress(net_outputs)): annos = [] info = self.data_infos[idx] sample_idx = info['image']['image_idx'] image_shape = info['image']['image_shape'][:2] box_dict = self.convert_valid_bboxes(pred_dicts, info) anno = { 'name': [], 'truncated': [], 'occluded': [], 'alpha': [], 'bbox': [], 'dimensions': [], 'location': [], 'rotation_y': [], 'score': [] } if len(box_dict['bbox']) > 0: box_2d_preds = box_dict['bbox'] box_preds = box_dict['box3d_camera'] scores = box_dict['scores'] box_preds_lidar = box_dict['box3d_lidar'] label_preds = box_dict['label_preds'] for box, box_lidar, bbox, score, label in zip( box_preds, box_preds_lidar, box_2d_preds, scores, label_preds): bbox[2:] = np.minimum(bbox[2:], image_shape[::-1]) bbox[:2] = np.maximum(bbox[:2], [0, 0]) anno['name'].append(class_names[int(label)]) anno['truncated'].append(0.0) anno['occluded'].append(0) anno['alpha'].append( -np.arctan2(-box_lidar[1], box_lidar[0]) + box[6]) anno['bbox'].append(bbox) anno['dimensions'].append(box[3:6]) anno['location'].append(box[:3]) anno['rotation_y'].append(box[6]) anno['score'].append(score) anno = {k: np.stack(v) for k, v in anno.items()} annos.append(anno) else: anno = { 'name': np.array([]), 'truncated': np.array([]), 'occluded': np.array([]), 'alpha': np.array([]), 'bbox': np.zeros([0, 4]), 'dimensions': np.zeros([0, 3]), 'location': np.zeros([0, 3]), 'rotation_y': np.array([]), 'score': np.array([]), } annos.append(anno) if submission_prefix is not None: curr_file = f'{submission_prefix}/{sample_idx:06d}.txt' with open(curr_file, 'w') as f: bbox = anno['bbox'] loc = anno['location'] dims = anno['dimensions'] # lhw -> hwl for idx in range(len(bbox)): print( '{} -1 -1 {:.4f} {:.4f} {:.4f} {:.4f} ' '{:.4f} {:.4f} {:.4f} ' '{:.4f} {:.4f} {:.4f} {:.4f} {:.4f} {:.4f}'.format( anno['name'][idx], anno['alpha'][idx], bbox[idx][0], bbox[idx][1], bbox[idx][2], bbox[idx][3], dims[idx][1], dims[idx][2], dims[idx][0], loc[idx][0], loc[idx][1], loc[idx][2], anno['rotation_y'][idx], anno['score'][idx]), file=f) annos[-1]['sample_idx'] = np.array( [sample_idx] * len(annos[-1]['score']), dtype=np.int64) det_annos += annos if pklfile_prefix is not None: if not pklfile_prefix.endswith(('.pkl', '.pickle')): out = f'{pklfile_prefix}.pkl' mmcv.dump(det_annos, out) print(f'Result is saved to {out}.') return det_annos def bbox2result_kitti2d(self, net_outputs, class_names, pklfile_prefix=None, submission_prefix=None): """Convert 2D detection results to kitti format for evaluation and test submission. Args: net_outputs (list[np.ndarray]): List of array storing the inferenced bounding boxes and scores. class_names (list[String]): A list of class names. pklfile_prefix (str): The prefix of pkl file. submission_prefix (str): The prefix of submission file. Returns: list[dict]: A list of dictionaries have the kitti format """ assert len(net_outputs) == len(self.data_infos), \ 'invalid list length of network outputs' det_annos = [] print('\nConverting prediction to KITTI format') for i, bboxes_per_sample in enumerate( mmcv.track_iter_progress(net_outputs)): annos = [] anno = dict( name=[], truncated=[], occluded=[], alpha=[], bbox=[], dimensions=[], location=[], rotation_y=[], score=[]) sample_idx = self.data_infos[i]['image']['image_idx'] num_example = 0 for label in range(len(bboxes_per_sample)): bbox = bboxes_per_sample[label] for i in range(bbox.shape[0]): anno['name'].append(class_names[int(label)]) anno['truncated'].append(0.0) anno['occluded'].append(0) anno['alpha'].append(0.0) anno['bbox'].append(bbox[i, :4]) # set dimensions (height, width, length) to zero anno['dimensions'].append( np.zeros(shape=[3], dtype=np.float32)) # set the 3D translation to (-1000, -1000, -1000) anno['location'].append( np.ones(shape=[3], dtype=np.float32) * (-1000.0)) anno['rotation_y'].append(0.0) anno['score'].append(bbox[i, 4]) num_example += 1 if num_example == 0: annos.append( dict( name=np.array([]), truncated=np.array([]), occluded=np.array([]), alpha=np.array([]), bbox=np.zeros([0, 4]), dimensions=np.zeros([0, 3]), location=np.zeros([0, 3]), rotation_y=np.array([]), score=np.array([]), )) else: anno = {k: np.stack(v) for k, v in anno.items()} annos.append(anno) annos[-1]['sample_idx'] = np.array( [sample_idx] * num_example, dtype=np.int64) det_annos += annos if pklfile_prefix is not None: # save file in pkl format pklfile_path = ( pklfile_prefix[:-4] if pklfile_prefix.endswith( ('.pkl', '.pickle')) else pklfile_prefix) mmcv.dump(det_annos, pklfile_path) if submission_prefix is not None: # save file in submission format mmcv.mkdir_or_exist(submission_prefix) print(f'Saving KITTI submission to {submission_prefix}') for i, anno in enumerate(det_annos): sample_idx = self.data_infos[i]['image']['image_idx'] cur_det_file = f'{submission_prefix}/{sample_idx:06d}.txt' with open(cur_det_file, 'w') as f: bbox = anno['bbox'] loc = anno['location'] dims = anno['dimensions'][::-1] # lhw -> hwl for idx in range(len(bbox)): print( '{} -1 -1 {:4f} {:4f} {:4f} {:4f} {:4f} {:4f} ' '{:4f} {:4f} {:4f} {:4f} {:4f} {:4f} {:4f}'.format( anno['name'][idx], anno['alpha'][idx], *bbox[idx], # 4 float *dims[idx], # 3 float *loc[idx], # 3 float anno['rotation_y'][idx], anno['score'][idx]), file=f, ) print(f'Result is saved to {submission_prefix}') return det_annos def convert_valid_bboxes(self, box_dict, info): """Convert the predicted boxes into valid ones. Args: box_dict (dict): Box dictionaries to be converted. - boxes_3d (:obj:`LiDARInstance3DBoxes`): 3D bounding boxes. - scores_3d (torch.Tensor): Scores of boxes. - labels_3d (torch.Tensor): Class labels of boxes. info (dict): Data info. Returns: dict: Valid predicted boxes. - bbox (np.ndarray): 2D bounding boxes. - box3d_camera (np.ndarray): 3D bounding boxes in camera coordinate. - box3d_lidar (np.ndarray): 3D bounding boxes in LiDAR coordinate. - scores (np.ndarray): Scores of boxes. - label_preds (np.ndarray): Class label predictions. - sample_idx (int): Sample index. """ # TODO: refactor this function box_preds = box_dict['boxes_3d'] scores = box_dict['scores_3d'] labels = box_dict['labels_3d'] sample_idx = info['image']['image_idx'] box_preds.limit_yaw(offset=0.5, period=np.pi * 2) if len(box_preds) == 0: return dict( bbox=np.zeros([0, 4]), box3d_camera=np.zeros([0, 7]), box3d_lidar=np.zeros([0, 7]), scores=np.zeros([0]), label_preds=np.zeros([0, 4]), sample_idx=sample_idx) rect = info['calib']['R0_rect'].astype(np.float32) Trv2c = info['calib']['Tr_velo_to_cam'].astype(np.float32) P2 = info['calib']['P2'].astype(np.float32) img_shape = info['image']['image_shape'] P2 = box_preds.tensor.new_tensor(P2) box_preds_camera = box_preds.convert_to(Box3DMode.CAM, rect @ Trv2c) box_corners = box_preds_camera.corners box_corners_in_image = points_cam2img(box_corners, P2) # box_corners_in_image: [N, 8, 2] minxy = torch.min(box_corners_in_image, dim=1)[0] maxxy = torch.max(box_corners_in_image, dim=1)[0] box_2d_preds = torch.cat([minxy, maxxy], dim=1) # Post-processing # check box_preds_camera image_shape = box_preds.tensor.new_tensor(img_shape) valid_cam_inds = ((box_2d_preds[:, 0] < image_shape[1]) & (box_2d_preds[:, 1] < image_shape[0]) & (box_2d_preds[:, 2] > 0) & (box_2d_preds[:, 3] > 0)) # check box_preds limit_range = box_preds.tensor.new_tensor(self.pcd_limit_range) valid_pcd_inds = ((box_preds.center > limit_range[:3]) & (box_preds.center < limit_range[3:])) valid_inds = valid_cam_inds & valid_pcd_inds.all(-1) if valid_inds.sum() > 0: return dict( bbox=box_2d_preds[valid_inds, :].numpy(), box3d_camera=box_preds_camera[valid_inds].tensor.numpy(), box3d_lidar=box_preds[valid_inds].tensor.numpy(), scores=scores[valid_inds].numpy(), label_preds=labels[valid_inds].numpy(), sample_idx=sample_idx) else: return dict( bbox=np.zeros([0, 4]), box3d_camera=np.zeros([0, 7]), box3d_lidar=np.zeros([0, 7]), scores=np.zeros([0]), label_preds=np.zeros([0, 4]), sample_idx=sample_idx) def _build_default_pipeline(self): """Build the default pipeline for this dataset.""" pipeline = [ dict( type='LoadPointsFromFile', coord_type='LIDAR', load_dim=4, use_dim=4, file_client_args=dict(backend='disk')), dict( type='DefaultFormatBundle3D', class_names=self.CLASSES, with_label=False), dict(type='Collect3D', keys=['points']) ] if self.modality['use_camera']: pipeline.insert(0, dict(type='LoadImageFromFile')) return Compose(pipeline) def show(self, results, out_dir, show=True, pipeline=None): """Results visualization. Args: results (list[dict]): List of bounding boxes results. out_dir (str): Output directory of visualization result. show (bool): Whether to visualize the results online. Default: False. pipeline (list[dict], optional): raw data loading for showing. Default: None. """ assert out_dir is not None, 'Expect out_dir, got none.' pipeline = self._get_pipeline(pipeline) for i, result in enumerate(results): if 'pts_bbox' in result.keys(): result = result['pts_bbox'] data_info = self.data_infos[i] pts_path = data_info['point_cloud']['velodyne_path'] file_name = osp.split(pts_path)[-1].split('.')[0] points, img_metas, img = self._extract_data( i, pipeline, ['points', 'img_metas', 'img']) points = points.numpy() # for now we convert points into depth mode points = Coord3DMode.convert_point(points, Coord3DMode.LIDAR, Coord3DMode.DEPTH) gt_bboxes = self.get_ann_info(i)['gt_bboxes_3d'].tensor.numpy() show_gt_bboxes = Box3DMode.convert(gt_bboxes, Box3DMode.LIDAR, Box3DMode.DEPTH) pred_bboxes = result['boxes_3d'].tensor.numpy() show_pred_bboxes = Box3DMode.convert(pred_bboxes, Box3DMode.LIDAR, Box3DMode.DEPTH) show_result(points, show_gt_bboxes, show_pred_bboxes, out_dir, file_name, show) # multi-modality visualization if self.modality['use_camera'] and 'lidar2img' in img_metas.keys(): img = img.numpy() # need to transpose channel to first dim img = img.transpose(1, 2, 0) show_pred_bboxes = LiDARInstance3DBoxes( pred_bboxes, origin=(0.5, 0.5, 0)) show_gt_bboxes = LiDARInstance3DBoxes( gt_bboxes, origin=(0.5, 0.5, 0)) show_multi_modality_result( img, show_gt_bboxes, show_pred_bboxes, img_metas['lidar2img'], out_dir, file_name, box_mode='lidar', show=show)