import numpy as np import pickle from mmcv import track_iter_progress from pathlib import Path from mmdet3d.core.bbox import box_np_ops from .kitti_data_utils import get_kitti_image_info def convert_to_kitti_info_version2(info): """convert kitti info v1 to v2 if possible. Args: info (dict): Info of the input kitti data. - image (dict): image info - calib (dict): calibration info - point_cloud (dict): point cloud info """ if 'image' not in info or 'calib' not in info or 'point_cloud' not in info: info['image'] = { 'image_shape': info['img_shape'], 'image_idx': info['image_idx'], 'image_path': info['img_path'], } info['calib'] = { 'R0_rect': info['calib/R0_rect'], 'Tr_velo_to_cam': info['calib/Tr_velo_to_cam'], 'P2': info['calib/P2'], } info['point_cloud'] = { 'velodyne_path': info['velodyne_path'], } def _read_imageset_file(path): with open(path, 'r') as f: lines = f.readlines() return [int(line) for line in lines] def _calculate_num_points_in_gt(data_path, infos, relative_path, remove_outside=True, num_features=4): for info in track_iter_progress(infos): pc_info = info['point_cloud'] image_info = info['image'] calib = info['calib'] if relative_path: v_path = str(Path(data_path) / pc_info['velodyne_path']) else: v_path = pc_info['velodyne_path'] points_v = np.fromfile( v_path, dtype=np.float32, count=-1).reshape([-1, num_features]) rect = calib['R0_rect'] Trv2c = calib['Tr_velo_to_cam'] P2 = calib['P2'] if remove_outside: points_v = box_np_ops.remove_outside_points( points_v, rect, Trv2c, P2, image_info['image_shape']) # points_v = points_v[points_v[:, 0] > 0] annos = info['annos'] num_obj = len([n for n in annos['name'] if n != 'DontCare']) # annos = kitti.filter_kitti_anno(annos, ['DontCare']) dims = annos['dimensions'][:num_obj] loc = annos['location'][:num_obj] rots = annos['rotation_y'][:num_obj] gt_boxes_camera = np.concatenate([loc, dims, rots[..., np.newaxis]], axis=1) gt_boxes_lidar = box_np_ops.box_camera_to_lidar( gt_boxes_camera, rect, Trv2c) indices = box_np_ops.points_in_rbbox(points_v[:, :3], gt_boxes_lidar) num_points_in_gt = indices.sum(0) num_ignored = len(annos['dimensions']) - num_obj num_points_in_gt = np.concatenate( [num_points_in_gt, -np.ones([num_ignored])]) annos['num_points_in_gt'] = num_points_in_gt.astype(np.int32) def create_kitti_info_file(data_path, pkl_prefix='kitti_', save_path=None, relative_path=True): """Create info file of KITTI dataset. Given the raw data, generate its related info file in pkl format. Args: data_path (str): Path of the data root. pkl_prefix (str): Prefix of the info file to be generated. save_path (str): Path to save the info file. relative_path (bool): Whether to use relative path. """ imageset_folder = Path(data_path) / 'ImageSets' train_img_ids = _read_imageset_file(str(imageset_folder / 'train.txt')) val_img_ids = _read_imageset_file(str(imageset_folder / 'val.txt')) test_img_ids = _read_imageset_file(str(imageset_folder / 'test.txt')) print('Generate info. this may take several minutes.') if save_path is None: save_path = Path(data_path) else: save_path = Path(save_path) kitti_infos_train = get_kitti_image_info( data_path, training=True, velodyne=True, calib=True, image_ids=train_img_ids, relative_path=relative_path) _calculate_num_points_in_gt(data_path, kitti_infos_train, relative_path) filename = save_path / f'{pkl_prefix}_infos_train.pkl' print(f'Kitti info train file is saved to {filename}') with open(filename, 'wb') as f: pickle.dump(kitti_infos_train, f) kitti_infos_val = get_kitti_image_info( data_path, training=True, velodyne=True, calib=True, image_ids=val_img_ids, relative_path=relative_path) _calculate_num_points_in_gt(data_path, kitti_infos_val, relative_path) filename = save_path / f'{pkl_prefix}_infos_val.pkl' print(f'Kitti info val file is saved to {filename}') with open(filename, 'wb') as f: pickle.dump(kitti_infos_val, f) filename = save_path / f'{pkl_prefix}_infos_trainval.pkl' print(f'Kitti info trainval file is saved to {filename}') with open(filename, 'wb') as f: pickle.dump(kitti_infos_train + kitti_infos_val, f) kitti_infos_test = get_kitti_image_info( data_path, training=False, label_info=False, velodyne=True, calib=True, image_ids=test_img_ids, relative_path=relative_path) filename = save_path / f'{pkl_prefix}_infos_test.pkl' print(f'Kitti info test file is saved to {filename}') with open(filename, 'wb') as f: pickle.dump(kitti_infos_test, f) def _create_reduced_point_cloud(data_path, info_path, save_path=None, back=False): with open(info_path, 'rb') as f: kitti_infos = pickle.load(f) for info in track_iter_progress(kitti_infos): pc_info = info['point_cloud'] image_info = info['image'] calib = info['calib'] v_path = pc_info['velodyne_path'] v_path = Path(data_path) / v_path points_v = np.fromfile( str(v_path), dtype=np.float32, count=-1).reshape([-1, 4]) rect = calib['R0_rect'] P2 = calib['P2'] Trv2c = calib['Tr_velo_to_cam'] # first remove z < 0 points # keep = points_v[:, -1] > 0 # points_v = points_v[keep] # then remove outside. if back: points_v[:, 0] = -points_v[:, 0] points_v = box_np_ops.remove_outside_points(points_v, rect, Trv2c, P2, image_info['image_shape']) if save_path is None: save_dir = v_path.parent.parent / (v_path.parent.stem + '_reduced') if not save_dir.exists(): save_dir.mkdir() save_filename = save_dir / v_path.name # save_filename = str(v_path) + '_reduced' if back: save_filename += '_back' else: save_filename = str(Path(save_path) / v_path.name) if back: save_filename += '_back' with open(save_filename, 'w') as f: points_v.tofile(f) def create_reduced_point_cloud(data_path, pkl_prefix, train_info_path=None, val_info_path=None, test_info_path=None, save_path=None, with_back=False): """Create reduced point cloud info file. Args: data_path (str): Path of original infos. pkl_prefix (str): Prefix of info files. train_info_path (str | None): Path of training set info. Default: None. val_info_path (str | None): Path of validation set info. Default: None. test_info_path (str | None): Path of test set info. Default: None. save_path (str | None): Path to save reduced info. with_back (bool | None): Whether to create backup info. """ if train_info_path is None: train_info_path = Path(data_path) / f'{pkl_prefix}_infos_train.pkl' if val_info_path is None: val_info_path = Path(data_path) / f'{pkl_prefix}_infos_val.pkl' if test_info_path is None: test_info_path = Path(data_path) / f'{pkl_prefix}_infos_test.pkl' print('create reduced point cloud for training set') _create_reduced_point_cloud(data_path, train_info_path, save_path) print('create reduced point cloud for validatin set') _create_reduced_point_cloud(data_path, val_info_path, save_path) print('create reduced point cloud for testing set') _create_reduced_point_cloud(data_path, test_info_path, save_path) if with_back: _create_reduced_point_cloud( data_path, train_info_path, save_path, back=True) _create_reduced_point_cloud( data_path, val_info_path, save_path, back=True) _create_reduced_point_cloud( data_path, test_info_path, save_path, back=True)