import mmcv import numpy as np import pyquaternion import tempfile from nuscenes.utils.data_classes import Box as NuScenesBox from os import path as osp from mmdet.datasets import DATASETS from ..core import show_result from ..core.bbox import Box3DMode, LiDARInstance3DBoxes from .custom_3d import Custom3DDataset @DATASETS.register_module() class NuScenesDataset(Custom3DDataset): """NuScenes Dataset. This class serves as the API for experiments on the NuScenes Dataset. Please refer to ``_for data downloading. It is recommended to symlink the dataset root to $MMDETECTION3D/data and organize them as the doc shows. Args: ann_file (str): Path of annotation file. pipeline (list[dict], optional): Pipeline used for data processing. Defaults to None. data_root (str): Path of dataset root. classes (tuple[str], optional): Classes used in the dataset. Defaults to None. load_interval (int, optional): Interval of loading the dataset. It is used to uniformly sample the dataset. Defaults to 1. with_velocity (bool, optional): Whether include velocity prediction into the experiments. Defaults to True. modality (dict, optional): Modality to specify the sensor data used as input. Defaults to None. box_type_3d (str, optional): Type of 3D box of this dataset. Based on the `box_type_3d`, the dataset will encapsulate the box to its original format then converted them to `box_type_3d`. Defaults to 'LiDAR' in this dataset. Available options includes - 'LiDAR': box in LiDAR coordinates - 'Depth': box in depth coordinates, usually for indoor dataset - 'Camera': box in camera coordinates filter_empty_gt (bool, optional): Whether to filter empty GT. Defaults to True. test_mode (bool, optional): Whether the dataset is in test mode. Defaults to False. eval_version (bool, optional): Configuration version of evaluation. Defaults to 'detection_cvpr_2019'. """ NameMapping = { 'movable_object.barrier': 'barrier', 'vehicle.bicycle': 'bicycle', 'vehicle.bus.bendy': 'bus', 'vehicle.bus.rigid': 'bus', 'vehicle.car': 'car', 'vehicle.construction': 'construction_vehicle', 'vehicle.motorcycle': 'motorcycle', 'human.pedestrian.adult': 'pedestrian', 'human.pedestrian.child': 'pedestrian', 'human.pedestrian.construction_worker': 'pedestrian', 'human.pedestrian.police_officer': 'pedestrian', 'movable_object.trafficcone': 'traffic_cone', 'vehicle.trailer': 'trailer', 'vehicle.truck': 'truck' } DefaultAttribute = { 'car': 'vehicle.parked', 'pedestrian': 'pedestrian.moving', 'trailer': 'vehicle.parked', 'truck': 'vehicle.parked', 'bus': 'vehicle.moving', 'motorcycle': 'cycle.without_rider', 'construction_vehicle': 'vehicle.parked', 'bicycle': 'cycle.without_rider', 'barrier': '', 'traffic_cone': '', } AttrMapping = { 'cycle.with_rider': 0, 'cycle.without_rider': 1, 'pedestrian.moving': 2, 'pedestrian.standing': 3, 'pedestrian.sitting_lying_down': 4, 'vehicle.moving': 5, 'vehicle.parked': 6, 'vehicle.stopped': 7, } AttrMapping_rev = [ 'cycle.with_rider', 'cycle.without_rider', 'pedestrian.moving', 'pedestrian.standing', 'pedestrian.sitting_lying_down', 'vehicle.moving', 'vehicle.parked', 'vehicle.stopped', ] CLASSES = ('car', 'truck', 'trailer', 'bus', 'construction_vehicle', 'bicycle', 'motorcycle', 'pedestrian', 'traffic_cone', 'barrier') def __init__(self, ann_file, pipeline=None, data_root=None, classes=None, load_interval=1, with_velocity=True, modality=None, box_type_3d='LiDAR', filter_empty_gt=True, test_mode=False, eval_version='detection_cvpr_2019'): self.load_interval = load_interval super().__init__( data_root=data_root, ann_file=ann_file, pipeline=pipeline, classes=classes, modality=modality, box_type_3d=box_type_3d, filter_empty_gt=filter_empty_gt, test_mode=test_mode) self.with_velocity = with_velocity self.eval_version = eval_version from nuscenes.eval.detection.config import config_factory self.eval_detection_configs = config_factory(self.eval_version) if self.modality is None: self.modality = dict( use_camera=False, use_lidar=True, use_radar=False, use_map=False, use_external=False, ) def load_annotations(self, ann_file): """Load annotations from ann_file. Args: ann_file (str): Path of the annotation file. Returns: list[dict]: List of annotations sorted by timestamps. """ data = mmcv.load(ann_file) data_infos = list(sorted(data['infos'], key=lambda e: e['timestamp'])) data_infos = data_infos[::self.load_interval] self.metadata = data['metadata'] self.version = self.metadata['version'] return data_infos def get_data_info(self, index): """Get data info according to the given index. Args: index (int): Index of the sample data to get. Returns: dict: Standard input_dict consists of the data information. - sample_idx (str): sample index - pts_filename (str): filename of point clouds - sweeps (list[dict]): infos of sweeps - timestamp (float): sample timestamp - img_filename (str, optional): image filename - lidar2img (list[np.ndarray], optional): transformations from lidar to different cameras - ann_info (dict): annotation info """ info = self.data_infos[index] # standard protocal modified from SECOND.Pytorch input_dict = dict( sample_idx=info['token'], pts_filename=info['lidar_path'], sweeps=info['sweeps'], timestamp=info['timestamp'] / 1e6, ) if self.modality['use_camera']: image_paths = [] lidar2img_rts = [] for cam_type, cam_info in info['cams'].items(): image_paths.append(cam_info['data_path']) # obtain lidar to image transformation matrix lidar2cam_r = np.linalg.inv(cam_info['sensor2lidar_rotation']) lidar2cam_t = cam_info[ 'sensor2lidar_translation'] @ lidar2cam_r.T lidar2cam_rt = np.eye(4) lidar2cam_rt[:3, :3] = lidar2cam_r.T lidar2cam_rt[3, :3] = -lidar2cam_t intrinsic = cam_info['cam_intrinsic'] viewpad = np.eye(4) viewpad[:intrinsic.shape[0], :intrinsic.shape[1]] = intrinsic lidar2img_rt = (viewpad @ lidar2cam_rt.T) lidar2img_rts.append(lidar2img_rt) input_dict.update( dict( img_filename=image_paths, lidar2img=lidar2img_rts, )) if not self.test_mode: annos = self.get_ann_info(index) input_dict['ann_info'] = annos return input_dict def get_ann_info(self, index): """Get annotation info according to the given index. Args: index (int): Index of the annotation data to get. Returns: dict: Standard annotation dictionary consists of the data information. - gt_bboxes_3d (:obj:``LiDARInstance3DBoxes``): 3D ground truth bboxes - gt_labels_3d (np.ndarray): labels of ground truths - gt_names (list[str]): class names of ground truths """ info = self.data_infos[index] # filter out bbox containing no points mask = info['num_lidar_pts'] > 0 gt_bboxes_3d = info['gt_boxes'][mask] gt_names_3d = info['gt_names'][mask] gt_labels_3d = [] for cat in gt_names_3d: if cat in self.CLASSES: gt_labels_3d.append(self.CLASSES.index(cat)) else: gt_labels_3d.append(-1) gt_labels_3d = np.array(gt_labels_3d) if self.with_velocity: gt_velocity = info['gt_velocity'][mask] nan_mask = np.isnan(gt_velocity[:, 0]) gt_velocity[nan_mask] = [0.0, 0.0] gt_bboxes_3d = np.concatenate([gt_bboxes_3d, gt_velocity], axis=-1) # the nuscenes box center is [0.5, 0.5, 0.5], we change it to be # the same as KITTI (0.5, 0.5, 0) gt_bboxes_3d = LiDARInstance3DBoxes( gt_bboxes_3d, box_dim=gt_bboxes_3d.shape[-1], origin=(0.5, 0.5, 0.5)).convert_to(self.box_mode_3d) anns_results = dict( gt_bboxes_3d=gt_bboxes_3d, gt_labels_3d=gt_labels_3d, gt_names=gt_names_3d) return anns_results def _format_bbox(self, results, jsonfile_prefix=None): """Convert the results to the standard format. Args: results (list[dict]): Testing results of the dataset. jsonfile_prefix (str): The prefix of the output jsonfile. You can specify the output directory/filename by modifying the jsonfile_prefix. Default: None. Returns: str: Path of the output json file. """ nusc_annos = {} mapped_class_names = self.CLASSES print('Start to convert detection format...') for sample_id, det in enumerate(mmcv.track_iter_progress(results)): annos = [] boxes = output_to_nusc_box(det) sample_token = self.data_infos[sample_id]['token'] boxes = lidar_nusc_box_to_global(self.data_infos[sample_id], boxes, mapped_class_names, self.eval_detection_configs, self.eval_version) for i, box in enumerate(boxes): name = mapped_class_names[box.label] if np.sqrt(box.velocity[0]**2 + box.velocity[1]**2) > 0.2: if name in [ 'car', 'construction_vehicle', 'bus', 'truck', 'trailer', ]: attr = 'vehicle.moving' elif name in ['bicycle', 'motorcycle']: attr = 'cycle.with_rider' else: attr = NuScenesDataset.DefaultAttribute[name] else: if name in ['pedestrian']: attr = 'pedestrian.standing' elif name in ['bus']: attr = 'vehicle.stopped' else: attr = NuScenesDataset.DefaultAttribute[name] nusc_anno = dict( sample_token=sample_token, translation=box.center.tolist(), size=box.wlh.tolist(), rotation=box.orientation.elements.tolist(), velocity=box.velocity[:2].tolist(), detection_name=name, detection_score=box.score, attribute_name=attr) annos.append(nusc_anno) nusc_annos[sample_token] = annos nusc_submissions = { 'meta': self.modality, 'results': nusc_annos, } mmcv.mkdir_or_exist(jsonfile_prefix) res_path = osp.join(jsonfile_prefix, 'results_nusc.json') print('Results writes to', res_path) mmcv.dump(nusc_submissions, res_path) return res_path def _evaluate_single(self, result_path, logger=None, metric='bbox', result_name='pts_bbox'): """Evaluation for a single model in nuScenes protocol. Args: result_path (str): Path of the result file. logger (logging.Logger | str | None): Logger used for printing related information during evaluation. Default: None. metric (str): Metric name used for evaluation. Default: 'bbox'. result_name (str): Result name in the metric prefix. Default: 'pts_bbox'. Returns: dict: Dictionary of evaluation details. """ from nuscenes import NuScenes from nuscenes.eval.detection.evaluate import NuScenesEval output_dir = osp.join(*osp.split(result_path)[:-1]) nusc = NuScenes( version=self.version, dataroot=self.data_root, verbose=False) eval_set_map = { 'v1.0-mini': 'mini_train', 'v1.0-trainval': 'val', } nusc_eval = NuScenesEval( nusc, config=self.eval_detection_configs, result_path=result_path, eval_set=eval_set_map[self.version], output_dir=output_dir, verbose=False) nusc_eval.main(render_curves=False) # record metrics metrics = mmcv.load(osp.join(output_dir, 'metrics_summary.json')) detail = dict() metric_prefix = f'{result_name}_NuScenes' for name in self.CLASSES: for k, v in metrics['label_aps'][name].items(): val = float('{:.4f}'.format(v)) detail['{}/{}_AP_dist_{}'.format(metric_prefix, name, k)] = val for k, v in metrics['label_tp_errors'][name].items(): val = float('{:.4f}'.format(v)) detail['{}/{}_{}'.format(metric_prefix, name, k)] = val detail['{}/NDS'.format(metric_prefix)] = metrics['nd_score'] detail['{}/mAP'.format(metric_prefix)] = metrics['mean_ap'] return detail def format_results(self, results, jsonfile_prefix=None): """Format the results to json (standard format for COCO evaluation). Args: results (list[dict]): Testing results of the dataset. jsonfile_prefix (str | None): The prefix of json files. It includes the file path and the prefix of filename, e.g., "a/b/prefix". If not specified, a temp file will be created. Default: None. Returns: tuple (dict, str): result_files is a dict containing the json filepaths, tmp_dir is the temporal directory created for saving json files when jsonfile_prefix is not specified. """ assert isinstance(results, list), 'results must be a list' assert len(results) == len(self), ( 'The length of results is not equal to the dataset len: {} != {}'. format(len(results), len(self))) if jsonfile_prefix is None: tmp_dir = tempfile.TemporaryDirectory() jsonfile_prefix = osp.join(tmp_dir.name, 'results') else: tmp_dir = None if not isinstance(results[0], dict): result_files = self._format_bbox(results, jsonfile_prefix) else: result_files = dict() for name in results[0]: print(f'\nFormating bboxes of {name}') results_ = [out[name] for out in results] tmp_file_ = osp.join(jsonfile_prefix, name) result_files.update( {name: self._format_bbox(results_, tmp_file_)}) return result_files, tmp_dir def evaluate(self, results, metric='bbox', logger=None, jsonfile_prefix=None, result_names=['pts_bbox'], show=False, out_dir=None): """Evaluation in nuScenes protocol. Args: results (list[dict]): Testing results of the dataset. metric (str | list[str]): Metrics to be evaluated. logger (logging.Logger | str | None): Logger used for printing related information during evaluation. Default: None. jsonfile_prefix (str | None): The prefix of json files. It includes the file path and the prefix of filename, e.g., "a/b/prefix". If not specified, a temp file will be created. Default: None. show (bool): Whether to visualize. Default: False. out_dir (str): Path to save the visualization results. Default: None. Returns: dict[str: float] """ result_files, tmp_dir = self.format_results(results, jsonfile_prefix) if isinstance(result_files, dict): results_dict = dict() for name in result_names: print('Evaluating bboxes of {}'.format(name)) ret_dict = self._evaluate_single(result_files[name]) results_dict.update(ret_dict) elif isinstance(result_files, str): results_dict = self._evaluate_single(result_files) if tmp_dir is not None: tmp_dir.cleanup() if show: self.show(results, out_dir) return results_dict def show(self, results, out_dir): """Results visualization. Args: results (list[dict]): List of bounding boxes results. out_dir (str): Output directory of visualization result. """ for i, result in enumerate(results): example = self.prepare_test_data(i) points = example['points'][0]._data.numpy() data_info = self.data_infos[i] pts_path = data_info['lidar_path'] file_name = osp.split(pts_path)[-1].split('.')[0] # for now we convert points into depth mode points = points[..., [1, 0, 2]] points[..., 0] *= -1 inds = result['pts_bbox']['scores_3d'] > 0.1 gt_bboxes = self.get_ann_info(i)['gt_bboxes_3d'].tensor gt_bboxes = Box3DMode.convert(gt_bboxes, Box3DMode.LIDAR, Box3DMode.DEPTH) gt_bboxes[..., 2] += gt_bboxes[..., 5] / 2 pred_bboxes = result['pts_bbox']['boxes_3d'][inds].tensor.numpy() pred_bboxes = Box3DMode.convert(pred_bboxes, Box3DMode.LIDAR, Box3DMode.DEPTH) pred_bboxes[..., 2] += pred_bboxes[..., 5] / 2 show_result(points, gt_bboxes, pred_bboxes, out_dir, file_name) def output_to_nusc_box(detection): """Convert the output to the box class in the nuScenes. Args: detection (dict): Detection results. - boxes_3d (:obj:``BaseInstance3DBoxes``): detection bbox - scores_3d (torch.Tensor): detection scores - labels_3d (torch.Tensor): predicted box labels Returns: list[:obj:``NuScenesBox``]: List of standard NuScenesBoxes. """ box3d = detection['boxes_3d'] scores = detection['scores_3d'].numpy() labels = detection['labels_3d'].numpy() box_gravity_center = box3d.gravity_center.numpy() box_dims = box3d.dims.numpy() box_yaw = box3d.yaw.numpy() # TODO: check whether this is necessary # with dir_offset & dir_limit in the head box_yaw = -box_yaw - np.pi / 2 box_list = [] for i in range(len(box3d)): quat = pyquaternion.Quaternion(axis=[0, 0, 1], radians=box_yaw[i]) velocity = (*box3d.tensor[i, 7:9], 0.0) # velo_val = np.linalg.norm(box3d[i, 7:9]) # velo_ori = box3d[i, 6] # velocity = ( # velo_val * np.cos(velo_ori), velo_val * np.sin(velo_ori), 0.0) box = NuScenesBox( box_gravity_center[i], box_dims[i], quat, label=labels[i], score=scores[i], velocity=velocity) box_list.append(box) return box_list def lidar_nusc_box_to_global(info, boxes, classes, eval_configs, eval_version='detection_cvpr_2019'): """Convert the box from ego to global coordinate. Args: info (dict): Info for a specific sample data, including the calibration information. boxes (list[:obj:``NuScenesBox``]): List of predicted NuScenesBoxes. classes (list[str]): Mapped classes in the evaluation. eval_configs (object): Evaluation configuration object. eval_version (str): Evaluation version. Default: 'detection_cvpr_2019' Returns: list: List of standard NuScenesBoxes in the global coordinate. """ box_list = [] for box in boxes: # Move box to ego vehicle coord system box.rotate(pyquaternion.Quaternion(info['lidar2ego_rotation'])) box.translate(np.array(info['lidar2ego_translation'])) # filter det in ego. cls_range_map = eval_configs.class_range radius = np.linalg.norm(box.center[:2], 2) det_range = cls_range_map[classes[box.label]] if radius > det_range: continue # Move box to global coord system box.rotate(pyquaternion.Quaternion(info['ego2global_rotation'])) box.translate(np.array(info['ego2global_translation'])) box_list.append(box) return box_list