import copy import os import os.path as osp import tempfile import mmcv import numpy as np import torch from mmcv.utils import print_log from mmdet.datasets import DATASETS from ..core.bbox import Box3DMode, CameraInstance3DBoxes from .custom_3d import Custom3DDataset from .utils import remove_dontcare @DATASETS.register_module() class KittiDataset(Custom3DDataset): CLASSES = ('car', 'pedestrian', 'cyclist') def __init__(self, data_root, ann_file, split, pts_prefix='velodyne', pipeline=None, classes=None, modality=None, box_type_3d='LiDAR', filter_empty_gt=True, test_mode=False): super().__init__( data_root=data_root, ann_file=ann_file, pipeline=pipeline, classes=classes, modality=modality, box_type_3d=box_type_3d, filter_empty_gt=filter_empty_gt, test_mode=test_mode) self.root_split = os.path.join(self.data_root, split) assert self.modality is not None self.pcd_limit_range = [0, -40, -3, 70.4, 40, 0.0] self.pts_prefix = pts_prefix def _get_pts_filename(self, idx): pts_filename = osp.join(self.root_split, self.pts_prefix, f'{idx:06d}.bin') return pts_filename def get_data_info(self, index): info = self.data_infos[index] sample_idx = info['image']['image_idx'] img_filename = os.path.join(self.root_split, info['image']['image_path']) # TODO: consider use torch.Tensor only rect = info['calib']['R0_rect'].astype(np.float32) Trv2c = info['calib']['Tr_velo_to_cam'].astype(np.float32) P2 = info['calib']['P2'].astype(np.float32) lidar2img = P2 @ rect @ Trv2c pts_filename = self._get_pts_filename(sample_idx) input_dict = dict( sample_idx=sample_idx, pts_filename=pts_filename, img_filename=img_filename, lidar2img=lidar2img) if not self.test_mode: annos = self.get_ann_info(index) input_dict['ann_info'] = annos return input_dict def get_ann_info(self, index): # Use index to get the annos, thus the evalhook could also use this api info = self.data_infos[index] rect = info['calib']['R0_rect'].astype(np.float32) Trv2c = info['calib']['Tr_velo_to_cam'].astype(np.float32) annos = info['annos'] # we need other objects to avoid collision when sample annos = remove_dontcare(annos) loc = annos['location'] dims = annos['dimensions'] rots = annos['rotation_y'] gt_names = annos['name'] # print(gt_names, len(loc)) gt_bboxes_3d = np.concatenate([loc, dims, rots[..., np.newaxis]], axis=1).astype(np.float32) # convert gt_bboxes_3d to velodyne coordinates gt_bboxes_3d = CameraInstance3DBoxes(gt_bboxes_3d).convert_to( self.box_mode_3d, np.linalg.inv(rect @ Trv2c)) gt_bboxes = annos['bbox'] selected = self.drop_arrays_by_name(gt_names, ['DontCare']) # gt_bboxes_3d = gt_bboxes_3d[selected].astype('float32') gt_bboxes = gt_bboxes[selected].astype('float32') gt_names = gt_names[selected] gt_labels = [] for cat in gt_names: if cat in self.CLASSES: gt_labels.append(self.CLASSES.index(cat)) else: gt_labels.append(-1) gt_labels = np.array(gt_labels) gt_labels_3d = copy.deepcopy(gt_labels) anns_results = dict( gt_bboxes_3d=gt_bboxes_3d, gt_labels_3d=gt_labels_3d, gt_bboxes=gt_bboxes, gt_labels=gt_labels) return anns_results def drop_arrays_by_name(self, gt_names, used_classes): inds = [i for i, x in enumerate(gt_names) if x not in used_classes] inds = np.array(inds, dtype=np.int64) return inds def keep_arrays_by_name(self, gt_names, used_classes): inds = [i for i, x in enumerate(gt_names) if x in used_classes] inds = np.array(inds, dtype=np.int64) return inds def format_results(self, outputs, pklfile_prefix=None, submission_prefix=None): if pklfile_prefix is None: tmp_dir = tempfile.TemporaryDirectory() pklfile_prefix = osp.join(tmp_dir.name, 'results') else: tmp_dir = None if not isinstance(outputs[0], dict): result_files = self.bbox2result_kitti2d(outputs, self.CLASSES, pklfile_prefix, submission_prefix) else: result_files = self.bbox2result_kitti(outputs, self.CLASSES, pklfile_prefix, submission_prefix) return result_files, tmp_dir def evaluate(self, results, metric=None, logger=None, pklfile_prefix=None, submission_prefix=None, result_names=['pts_bbox']): """Evaluation in KITTI protocol. Args: results (list): Testing results of the dataset. metric (str | list[str]): Metrics to be evaluated. logger (logging.Logger | str | None): Logger used for printing related information during evaluation. Default: None. pklfile_prefix (str | None): The prefix of pkl files. It includes the file path and the prefix of filename, e.g., "a/b/prefix". If not specified, a temp file will be created. Default: None. submission_prefix (str | None): The prefix of submission datas. If not specified, the submission data will not be generated. Returns: dict[str: float] """ result_files, tmp_dir = self.format_results(results, pklfile_prefix) from mmdet3d.core.evaluation import kitti_eval gt_annos = [info['annos'] for info in self.data_infos] if metric == 'img_bbox': ap_result_str, ap_dict = kitti_eval( gt_annos, result_files, self.CLASSES, eval_types=['bbox']) else: ap_result_str, ap_dict = kitti_eval(gt_annos, result_files, self.CLASSES) print_log('\n' + ap_result_str, logger=logger) if tmp_dir is not None: tmp_dir.cleanup() return ap_dict def bbox2result_kitti(self, net_outputs, class_names, pklfile_prefix=None, submission_prefix=None): assert len(net_outputs) == len(self.data_infos) if submission_prefix is not None: mmcv.mkdir_or_exist(submission_prefix) det_annos = [] print('\nConverting prediction to KITTI format') for idx, pred_dicts in enumerate( mmcv.track_iter_progress(net_outputs)): annos = [] info = self.data_infos[idx] sample_idx = info['image']['image_idx'] image_shape = info['image']['image_shape'][:2] box_dict = self.convert_valid_bboxes(pred_dicts, info) if len(box_dict['bbox']) > 0: box_2d_preds = box_dict['bbox'] box_preds = box_dict['box3d_camera'] scores = box_dict['scores'] box_preds_lidar = box_dict['box3d_lidar'] label_preds = box_dict['label_preds'] anno = { 'name': [], 'truncated': [], 'occluded': [], 'alpha': [], 'bbox': [], 'dimensions': [], 'location': [], 'rotation_y': [], 'score': [] } for box, box_lidar, bbox, score, label in zip( box_preds, box_preds_lidar, box_2d_preds, scores, label_preds): bbox[2:] = np.minimum(bbox[2:], image_shape[::-1]) bbox[:2] = np.maximum(bbox[:2], [0, 0]) anno['name'].append(class_names[int(label)]) anno['truncated'].append(0.0) anno['occluded'].append(0) anno['alpha'].append( -np.arctan2(-box_lidar[1], box_lidar[0]) + box[6]) anno['bbox'].append(bbox) anno['dimensions'].append(box[3:6]) anno['location'].append(box[:3]) anno['rotation_y'].append(box[6]) anno['score'].append(score) anno = {k: np.stack(v) for k, v in anno.items()} annos.append(anno) if submission_prefix is not None: curr_file = f'{submission_prefix}/{sample_idx:06d}.txt' with open(curr_file, 'w') as f: bbox = anno['bbox'] loc = anno['location'] dims = anno['dimensions'] # lhw -> hwl for idx in range(len(bbox)): print( '{} -1 -1 {:.4f} {:.4f} {:.4f} {:.4f} ' '{:.4f} {:.4f} {:.4f} ' '{:.4f} {:.4f} {:.4f} {:.4f} {:.4f} {:.4f}'. format(anno['name'][idx], anno['alpha'][idx], bbox[idx][0], bbox[idx][1], bbox[idx][2], bbox[idx][3], dims[idx][1], dims[idx][2], dims[idx][0], loc[idx][0], loc[idx][1], loc[idx][2], anno['rotation_y'][idx], anno['score'][idx]), file=f) else: annos.append({ 'name': np.array([]), 'truncated': np.array([]), 'occluded': np.array([]), 'alpha': np.array([]), 'bbox': np.zeros([0, 4]), 'dimensions': np.zeros([0, 3]), 'location': np.zeros([0, 3]), 'rotation_y': np.array([]), 'score': np.array([]), }) annos[-1]['sample_idx'] = np.array( [sample_idx] * len(annos[-1]['score']), dtype=np.int64) det_annos += annos if pklfile_prefix is not None: if not pklfile_prefix.endswith(('.pkl', '.pickle')): out = f'{pklfile_prefix}.pkl' mmcv.dump(det_annos, out) print('Result is saved to %s' % out) return det_annos def bbox2result_kitti2d(self, net_outputs, class_names, pklfile_prefix=None, submission_prefix=None): """Convert results to kitti format for evaluation and test submission Args: net_outputs (List[array]): list of array storing the bbox and score class_nanes (List[String]): A list of class names pklfile_prefix (str | None): The prefix of pkl file. submission_prefix (str | None): The prefix of submission file. Return: List([dict]): A list of dict have the kitti format """ assert len(net_outputs) == len(self.data_infos) det_annos = [] print('\nConverting prediction to KITTI format') for i, bboxes_per_sample in enumerate( mmcv.track_iter_progress(net_outputs)): annos = [] anno = dict( name=[], truncated=[], occluded=[], alpha=[], bbox=[], dimensions=[], location=[], rotation_y=[], score=[]) sample_idx = self.data_infos[i]['image']['image_idx'] num_example = 0 for label in range(len(bboxes_per_sample)): bbox = bboxes_per_sample[label] for i in range(bbox.shape[0]): anno['name'].append(class_names[int(label)]) anno['truncated'].append(0.0) anno['occluded'].append(0) anno['alpha'].append(0.0) anno['bbox'].append(bbox[i, :4]) # set dimensions (height, width, length) to zero anno['dimensions'].append( np.zeros(shape=[3], dtype=np.float32)) # set the 3D translation to (-1000, -1000, -1000) anno['location'].append( np.ones(shape=[3], dtype=np.float32) * (-1000.0)) anno['rotation_y'].append(0.0) anno['score'].append(bbox[i, 4]) num_example += 1 if num_example == 0: annos.append( dict( name=np.array([]), truncated=np.array([]), occluded=np.array([]), alpha=np.array([]), bbox=np.zeros([0, 4]), dimensions=np.zeros([0, 3]), location=np.zeros([0, 3]), rotation_y=np.array([]), score=np.array([]), )) else: anno = {k: np.stack(v) for k, v in anno.items()} annos.append(anno) annos[-1]['sample_idx'] = np.array( [sample_idx] * num_example, dtype=np.int64) det_annos += annos if pklfile_prefix is not None: # save file in pkl format pklfile_path = ( pklfile_prefix[:-4] if pklfile_prefix.endswith( ('.pkl', '.pickle')) else pklfile_prefix) mmcv.dump(det_annos, pklfile_path) if submission_prefix is not None: # save file in submission format mmcv.mkdir_or_exist(submission_prefix) print(f'Saving KITTI submission to {submission_prefix}') for i, anno in enumerate(det_annos): sample_idx = self.data_infos[i]['image']['image_idx'] cur_det_file = f'{submission_prefix}/{sample_idx:06d}.txt' with open(cur_det_file, 'w') as f: bbox = anno['bbox'] loc = anno['location'] dims = anno['dimensions'][::-1] # lhw -> hwl for idx in range(len(bbox)): print( '{} -1 -1 {:4f} {:4f} {:4f} {:4f} {:4f} {:4f} ' '{:4f} {:4f} {:4f} {:4f} {:4f} {:4f} {:4f}'.format( anno['name'][idx], anno['alpha'][idx], *bbox[idx], # 4 float *dims[idx], # 3 float *loc[idx], # 3 float anno['rotation_y'][idx], anno['score'][idx]), file=f, ) print('Result is saved to {}'.format(submission_prefix)) return det_annos def convert_valid_bboxes(self, box_dict, info): # TODO: refactor this function box_preds = box_dict['boxes_3d'] scores = box_dict['scores_3d'] labels = box_dict['labels_3d'] sample_idx = info['image']['image_idx'] # TODO: remove the hack of yaw box_preds.tensor[:, -1] = box_preds.tensor[:, -1] - np.pi box_preds.limit_yaw(offset=0.5, period=np.pi * 2) if len(box_preds) == 0: return dict( bbox=np.zeros([0, 4]), box3d_camera=np.zeros([0, 7]), box3d_lidar=np.zeros([0, 7]), scores=np.zeros([0]), label_preds=np.zeros([0, 4]), sample_idx=sample_idx) from mmdet3d.core.bbox import box_torch_ops rect = info['calib']['R0_rect'].astype(np.float32) Trv2c = info['calib']['Tr_velo_to_cam'].astype(np.float32) P2 = info['calib']['P2'].astype(np.float32) img_shape = info['image']['image_shape'] P2 = box_preds.tensor.new_tensor(P2) box_preds_camera = box_preds.convert_to(Box3DMode.CAM, rect @ Trv2c) box_corners = box_preds_camera.corners box_corners_in_image = box_torch_ops.project_to_image(box_corners, P2) # box_corners_in_image: [N, 8, 2] minxy = torch.min(box_corners_in_image, dim=1)[0] maxxy = torch.max(box_corners_in_image, dim=1)[0] box_2d_preds = torch.cat([minxy, maxxy], dim=1) # Post-processing # check box_preds_camera image_shape = box_preds.tensor.new_tensor(img_shape) valid_cam_inds = ((box_preds_camera.tensor[:, 0] < image_shape[1]) & (box_preds_camera.tensor[:, 1] < image_shape[0]) & (box_preds_camera.tensor[:, 2] > 0) & (box_preds_camera.tensor[:, 3] > 0)) # check box_preds limit_range = box_preds.tensor.new_tensor(self.pcd_limit_range) valid_pcd_inds = ((box_preds.center > limit_range[:3]) & (box_preds.center < limit_range[3:])) valid_inds = valid_cam_inds & valid_pcd_inds.all(-1) if valid_inds.sum() > 0: return dict( bbox=box_2d_preds[valid_inds, :].numpy(), box3d_camera=box_preds_camera[valid_inds].tensor.numpy(), box3d_lidar=box_preds[valid_inds].tensor.numpy(), scores=scores[valid_inds].numpy(), label_preds=labels[valid_inds].numpy(), sample_idx=sample_idx, ) else: return dict( bbox=np.zeros([0, 4]), box3d_camera=np.zeros([0, 7]), box3d_lidar=np.zeros([0, 7]), scores=np.zeros([0]), label_preds=np.zeros([0, 4]), sample_idx=sample_idx, )