Commit a79b105b authored by jshilong's avatar jshilong Committed by ChaimZhu
Browse files

Refactor kitti dataset

parent 3fa5a430
# Copyright (c) OpenMMLab. All rights reserved.
from .builder import DATASETS, PIPELINES, build_dataset
from .custom_3d import Custom3DDataset
from .custom_3d_seg import Custom3DSegDataset
from .det3d_dataset import Det3DDataset
from .kitti_dataset import KittiDataset
from .kitti_mono_dataset import KittiMonoDataset
from .lyft_dataset import LyftDataset
......@@ -36,7 +36,7 @@ __all__ = [
'IndoorPatchPointSample', 'IndoorPointSample', 'PointSample',
'LoadAnnotations3D', 'GlobalAlignment', 'SUNRGBDDataset', 'ScanNetDataset',
'ScanNetSegDataset', 'ScanNetInstanceSegDataset', 'SemanticKITTIDataset',
'Custom3DDataset', 'Custom3DSegDataset', 'LoadPointsFromMultiSweeps',
'Det3DDataset', 'Custom3DSegDataset', 'LoadPointsFromMultiSweeps',
'WaymoDataset', 'BackgroundPointsFilter', 'VoxelBasedPointSampler',
'get_loading_pipeline', 'RandomDropPointsColor', 'RandomJitterPoints',
'ObjectNameFilter', 'AffineResize', 'RandomShiftScale',
......
# Copyright (c) OpenMMLab. All rights reserved.
import copy
import tempfile
import warnings
from os import path as osp
from typing import Callable, List, Optional, Union
import mmcv
import numpy as np
from torch.utils.data import Dataset
from mmengine.dataset import BaseDataset
from mmdet3d.registry import DATASETS
from mmdet3d.datasets import DATASETS
from ..core.bbox import get_box_type
from .pipelines import Compose
from .utils import extract_result_dict, get_loading_pipeline
@DATASETS.register_module()
class Custom3DDataset(Dataset):
"""Customized 3D dataset.
class Det3DDataset(BaseDataset):
"""Base Class of 3D dataset.
This is the base dataset of SUNRGB-D, ScanNet, nuScenes, and KITTI
dataset.
.. code-block:: none
[
{'sample_idx':
'lidar_points': {'lidar_path': velodyne_path,
....
},
'annos': {'box_type_3d': (str) 'LiDAR/Camera/Depth'
'gt_bboxes_3d': <np.ndarray> (n, 7)
'gt_names': [list]
....
}
'calib': { .....}
'images': { .....}
}
]
# TODO: doc link here for the standard data format
Args:
data_root (str): Path of dataset root.
ann_file (str): Path of annotation file.
data_root (str, optional): The root directory for ``data_prefix`` and
``ann_file``. Defaults to None.
ann_file (str): Annotation file path. Defaults to ''.
metainfo (dict, optional): Meta information for dataset, such as class
information. Defaults to None.
data_prefix (dict, optional): Prefix for training data. Defaults to
dict(pts='velodyne', img="").
pipeline (list[dict], optional): Pipeline used for data processing.
Defaults to None.
classes (tuple[str], optional): Classes used in the dataset.
Defaults to None.
modality (dict, optional): Modality to specify the sensor data used
as input. Defaults to None.
as input, it usually has following keys.
- use_camera: bool
- use_lidar: bool
Defaults to `dict(use_lidar=True, use_camera=False)`
box_type_3d (str, optional): Type of 3D box of this dataset.
Based on the `box_type_3d`, the dataset will encapsulate the box
to its original format then converted them to `box_type_3d`.
Defaults to 'LiDAR'. Available options includes
- 'LiDAR': Box in LiDAR coordinates.
- 'Depth': Box in depth coordinates, usually for indoor dataset.
- 'Camera': Box in camera coordinates.
filter_empty_gt (bool, optional): Whether to filter empty GT.
Defaults to True.
- 'LiDAR': Box in LiDAR coordinates, usually for
outdoor point cloud 3d detection.
- 'Depth': Box in depth coordinates, usually for
indoor point cloud 3d detection.
- 'Camera': Box in camera coordinates, usually
for vision-based 3d detection.
filter_empty_gt (bool, optional): Whether to filter the data with
empty GT. Defaults to True.
test_mode (bool, optional): Whether the dataset is in test mode.
Defaults to False.
"""
def __init__(self,
data_root,
ann_file,
pipeline=None,
classes=None,
modality=None,
box_type_3d='LiDAR',
filter_empty_gt=True,
test_mode=False,
file_client_args=dict(backend='disk')):
super().__init__()
self.data_root = data_root
self.ann_file = ann_file
self.test_mode = test_mode
self.modality = modality
data_root: Optional[str] = None,
ann_file: str = '',
metainfo: Optional[dict] = None,
data_prefix: dict = dict(pts='velodyne', img=''),
pipeline: List[Union[dict, Callable]] = [],
modality: dict = dict(use_lidar=True, use_camera=False),
box_type_3d: dict = 'LiDAR',
filter_empty_gt: bool = True,
test_mode: bool = False,
file_client_args: dict = dict(backend='disk'),
**kwargs):
# init file client
self.file_client = mmcv.FileClient(**file_client_args)
self.filter_empty_gt = filter_empty_gt
self.box_type_3d, self.box_mode_3d = get_box_type(box_type_3d)
_default_modality_keys = ('use_lidar', 'use_camera')
if modality is None:
modality = dict()
# Defaults to False if not specify
for key in _default_modality_keys:
if key not in modality:
modality[key] = False
self.modality = modality
assert self.modality['use_lidar'] or self.modality['use_camera'], (
'Please specify the `modality` (`use_lidar` '
f' or `use_camera`) for {self.__class__.__name__}')
self.CLASSES = self.get_classes(classes)
self.file_client = mmcv.FileClient(**file_client_args)
self.cat2id = {name: i for i, name in enumerate(self.CLASSES)}
self.box_type_3d, self.box_mode_3d = get_box_type(box_type_3d)
# load annotations
if hasattr(self.file_client, 'get_local_path'):
with self.file_client.get_local_path(self.ann_file) as local_path:
self.data_infos = self.load_annotations(open(local_path, 'rb'))
if metainfo is not None and 'CLASSES' in metainfo:
# we allow to train on subset of self.METAINFO['CLASSES']
# map unselected labels to -1
self.label_mapping = {
i: -1
for i in range(len(self.METAINFO['CLASSES']))
}
self.label_mapping[-1] = -1
for label_idx, name in enumerate(metainfo['CLASSES']):
ori_label = self.METAINFO['CLASSES'].index(name)
self.label_mapping[ori_label] = label_idx
else:
warnings.warn(
'The used MMCV version does not have get_local_path. '
f'We treat the {self.ann_file} as local paths and it '
'might cause errors if the path is not a local path. '
'Please use MMCV>= 1.3.16 if you meet errors.')
self.data_infos = self.load_annotations(self.ann_file)
# process pipeline
if pipeline is not None:
self.pipeline = Compose(pipeline)
# set group flag for the samplers
if not self.test_mode:
self._set_group_flag()
def load_annotations(self, ann_file):
"""Load annotations from ann_file.
self.label_mapping = {
i: i
for i in range(len(self.METAINFO['CLASSES']))
}
self.label_mapping[-1] = -1
super().__init__(
ann_file=ann_file,
metainfo=metainfo,
data_root=data_root,
data_prefix=data_prefix,
pipeline=pipeline,
test_mode=test_mode,
**kwargs)
def _remove_dontcare(self, ann_info):
"""Remove annotations that do not need to be cared.
-1 indicate dontcare in MMDet3d.
Args:
ann_file (str): Path of the annotation file.
ann_info (dict): Dict of annotation infos. The
instance with label `-1` will be removed.
Returns:
list[dict]: List of annotations.
dict: Annotations after filtering.
"""
# loading data from a file-like object needs file format
return mmcv.load(ann_file, file_format='pkl')
img_filtered_annotations = {}
filter_mask = ann_info['gt_labels_3d'] > -1
for key in ann_info.keys():
img_filtered_annotations[key] = (ann_info[key][filter_mask])
return img_filtered_annotations
def get_ann_info(self, index: int) -> dict:
"""Get annotation info according to the given index.
def get_data_info(self, index):
"""Get data info according to the given index.
Use index to get the corresponding annotations, thus the
evalhook could use this api.
Args:
index (int): Index of the sample data to get.
index (int): Index of the annotation data to get.
Returns:
dict: Data information that will be passed to the data
preprocessing pipelines. It includes the following keys:
- sample_idx (str): Sample index.
- pts_filename (str): Filename of point clouds.
- file_name (str): Filename of point clouds.
- ann_info (dict): Annotation info.
dict: annotation information.
"""
info = self.data_infos[index]
sample_idx = info['sample_idx']
pts_filename = osp.join(self.data_root,
info['lidar_points']['lidar_path'])
data_info = self.get_data_info(index)
# test model
if 'ann_info' not in data_info:
ann_info = self.parse_ann_info(data_info)
else:
ann_info = data_info['ann_info']
input_dict = dict(
pts_filename=pts_filename,
sample_idx=sample_idx,
file_name=pts_filename)
return ann_info
if not self.test_mode:
annos = self.get_ann_info(index)
input_dict['ann_info'] = annos
if self.filter_empty_gt and ~(annos['gt_labels_3d'] != -1).any():
return None
return input_dict
def parse_ann_info(self, info: dict) -> dict:
"""Process the `instances` in data info to `ann_info`
def get_ann_info(self, index):
"""Get annotation info according to the given index.
In `Custom3DDataset`, we simply concatenate all the field
in `instances` to `np.ndarray`, you can do the specific
process in subclass. You have to convert `gt_bboxes_3d`
to different coordinates according to the task.
Args:
index (int): Index of the annotation data to get.
info (dict): Info dict.
Returns:
dict: Annotation information consists of the following keys:
- gt_bboxes_3d (:obj:`LiDARInstance3DBoxes`):
3D ground truth bboxes
- gt_labels_3d (np.ndarray): Labels of ground truths.
- gt_names (list[str]): Class names of ground truths.
dict: Processed `ann_info`
"""
info = self.data_infos[index]
gt_bboxes_3d = info['annos']['gt_bboxes_3d']
gt_names_3d = info['annos']['gt_names']
gt_labels_3d = []
for cat in gt_names_3d:
if cat in self.CLASSES:
gt_labels_3d.append(self.CLASSES.index(cat))
else:
gt_labels_3d.append(-1)
gt_labels_3d = np.array(gt_labels_3d)
# Obtain original box 3d type in info file
ori_box_type_3d = info['annos']['box_type_3d']
ori_box_type_3d, _ = get_box_type(ori_box_type_3d)
# turn original box type to target box type
gt_bboxes_3d = ori_box_type_3d(
gt_bboxes_3d,
box_dim=gt_bboxes_3d.shape[-1],
origin=(0.5, 0.5, 0.5)).convert_to(self.box_mode_3d)
anns_results = dict(
gt_bboxes_3d=gt_bboxes_3d,
gt_labels_3d=gt_labels_3d,
gt_names=gt_names_3d)
return anns_results
def pre_pipeline(self, results):
"""Initialization before data preparation.
# add s or gt prefix for most keys after concat
name_mapping = {
'bbox_label': 'gt_labels',
'bbox_label_3d': 'gt_labels_3d',
'bbox': 'gt_bboxes',
'bbox_3d': 'gt_bboxes_3d',
'depth': 'depths',
'center_2d': 'centers_2d',
'attr_label': 'attr_labels'
}
Args:
results (dict): Dict before data preprocessing.
- img_fields (list): Image fields.
- bbox3d_fields (list): 3D bounding boxes fields.
- pts_mask_fields (list): Mask fields of points.
- pts_seg_fields (list): Mask fields of point segments.
- bbox_fields (list): Fields of bounding boxes.
- mask_fields (list): Fields of masks.
- seg_fields (list): Segment fields.
- box_type_3d (str): 3D box type.
- box_mode_3d (str): 3D box mode.
"""
results['img_fields'] = []
results['bbox3d_fields'] = []
results['pts_mask_fields'] = []
results['pts_seg_fields'] = []
results['bbox_fields'] = []
results['mask_fields'] = []
results['seg_fields'] = []
results['box_type_3d'] = self.box_type_3d
results['box_mode_3d'] = self.box_mode_3d
def prepare_train_data(self, index):
"""Training data preparation.
instances = info['instances']
keys = list(instances[0].keys())
ann_info = dict()
for ann_name in keys:
temp_anns = [item[ann_name] for item in instances]
if 'label' in ann_name:
temp_anns = [self.label_mapping[item] for item in temp_anns]
temp_anns = np.array(temp_anns)
if ann_name in name_mapping:
ann_name = name_mapping[ann_name]
ann_info[ann_name] = temp_anns
return ann_info
def parse_data_info(self, info: dict) -> dict:
"""Process the raw data info.
Convert all relative path of needed modality data file to
the absolute path. And process
the `instances` field to `ann_info` in training stage.
Args:
index (int): Index for accessing the target data.
info (dict): Raw info dict.
Returns:
dict: Training data dict of the corresponding index.
dict: Has `ann_info` in training stage. And
all path has been converted to absolute path.
"""
input_dict = self.get_data_info(index)
if input_dict is None:
return None
self.pre_pipeline(input_dict)
example = self.pipeline(input_dict)
if self.filter_empty_gt and \
(example is None or
~(example['gt_labels_3d']._data != -1).any()):
return None
return example
def prepare_test_data(self, index):
"""Prepare data for testing.
if self.modality['use_lidar']:
info['lidar_points']['lidar_path'] = \
osp.join(
self.data_prefix.get('pts', ''),
info['lidar_points']['lidar_path'])
if self.modality['use_camera']:
for cam_id, img_info in info['images'].items():
if 'img_path' in img_info:
img_info['img_path'] = osp.join(
self.data_prefix.get('img', ''), img_info['img_path'])
if not self.test_mode:
info['ann_info'] = self.parse_ann_info(info)
return info
def prepare_data(self, index):
"""Data preparation for both training and testing stage.
Called by `__getitem__` of dataset.
Args:
index (int): Index for accessing the target data.
Returns:
dict: Testing data dict of the corresponding index.
dict: Data dict of the corresponding index.
"""
input_dict = self.get_data_info(index)
self.pre_pipeline(input_dict)
example = self.pipeline(input_dict)
return example
@classmethod
def get_classes(cls, classes=None):
"""Get class names of current dataset.
# deepcopy here to avoid inplace modification in pipeline.
input_dict = copy.deepcopy(input_dict)
Args:
classes (Sequence[str] | str): If classes is None, use
default CLASSES defined by builtin dataset. If classes is a
string, take it as a file name. The file contains the name of
classes where each line contains one class name. If classes is
a tuple or list, override the CLASSES defined by the dataset.
Return:
list[str]: A list of class names.
"""
if classes is None:
return cls.CLASSES
if isinstance(classes, str):
# take it as a file path
class_names = mmcv.list_from_file(classes)
elif isinstance(classes, (tuple, list)):
class_names = classes
else:
raise ValueError(f'Unsupported type {type(classes)} of classes.')
# box_type_3d (str): 3D box type.
input_dict['box_type_3d'] = self.box_type_3d
# box_mode_3d (str): 3D box mode.
input_dict['box_mode_3d'] = self.box_mode_3d
return class_names
# pre-pipline return None to random another in `__getitem__`
if not self.test_mode and self.filter_empty_gt:
if len(input_dict['ann_info']['gt_labels_3d']) == 0:
return None
example = self.pipeline(input_dict)
if not self.test_mode and self.filter_empty_gt:
# after pipeline drop the example with empty annotations
# return None to random another in `__getitem__`
if example is None or len(example['gt_labels_3d']) == 0:
return None
return example
def format_results(self,
outputs,
......@@ -350,11 +333,13 @@ class Custom3DDataset(Dataset):
return ret_dict
# TODO check this where does this method is used
def _build_default_pipeline(self):
"""Build the default pipeline for this dataset."""
raise NotImplementedError('_build_default_pipeline is not implemented '
f'for dataset {self.__class__.__name__}')
# TODO check this where does this method is used
def _get_pipeline(self, pipeline):
"""Get data loading pipeline in self.show/evaluate function.
......@@ -372,6 +357,7 @@ class Custom3DDataset(Dataset):
return Compose(loading_pipeline)
return Compose(pipeline)
# TODO check this where does this method is used
def _extract_data(self, index, pipeline, key, load_annos=False):
"""Load data using input pipeline and extract data according to key.
......@@ -405,44 +391,3 @@ class Custom3DDataset(Dataset):
self.test_mode = original_test_mode
return data
def __len__(self):
"""Return the length of data infos.
Returns:
int: Length of data infos.
"""
return len(self.data_infos)
def _rand_another(self, idx):
"""Randomly get another item with the same flag.
Returns:
int: Another index of item with the same flag.
"""
pool = np.where(self.flag == self.flag[idx])[0]
return np.random.choice(pool)
def __getitem__(self, idx):
"""Get item from infos according to the given index.
Returns:
dict: Data dictionary of the corresponding index.
"""
if self.test_mode:
return self.prepare_test_data(idx)
while True:
data = self.prepare_train_data(idx)
if data is None:
idx = self._rand_another(idx)
continue
return data
def _set_group_flag(self):
"""Set flag according to image aspect ratio.
Images with aspect ratio greater than 1 will be set as group 1,
otherwise group 0. In 3D datasets, they are all the same, thus are all
zeros.
"""
self.flag = np.zeros(len(self), dtype=np.uint8)
......@@ -2,12 +2,12 @@
import mmcv
import numpy as np
from mmdet3d.datasets import CustomDataset
from mmdet3d.datasets import Det3DDataset
from mmdet3d.registry import DATASETS
@DATASETS.register_module()
class Kitti2DDataset(CustomDataset):
class Kitti2DDataset(Det3DDataset):
r"""KITTI 2D Dataset.
This class serves as the API for experiments on the `KITTI Dataset
......
# Copyright (c) OpenMMLab. All rights reserved.
import copy
import os
import tempfile
from os import path as osp
from typing import Callable, List, Optional, Union
import mmcv
import numpy as np
import torch
from mmcv.utils import print_log
from mmdet3d.registry import DATASETS
from mmdet3d.datasets import DATASETS
from ..core import show_multi_modality_result, show_result
from ..core.bbox import (Box3DMode, CameraInstance3DBoxes, Coord3DMode,
LiDARInstance3DBoxes, points_cam2img)
from .custom_3d import Custom3DDataset
from .det3d_dataset import Det3DDataset
from .pipelines import Compose
@DATASETS.register_module()
class KittiDataset(Custom3DDataset):
class KittiDataset(Det3DDataset):
r"""KITTI Dataset.
This class serves as the API for experiments on the `KITTI Dataset
......@@ -28,12 +28,8 @@ class KittiDataset(Custom3DDataset):
data_root (str): Path of dataset root.
ann_file (str): Path of annotation file.
split (str): Split of input data.
pts_prefix (str, optional): Prefix of points files.
Defaults to 'velodyne'.
pipeline (list[dict], optional): Pipeline used for data processing.
Defaults to None.
classes (tuple[str], optional): Classes used in the dataset.
Defaults to None.
modality (dict, optional): Modality to specify the sensor data used
as input. Defaults to None.
box_type_3d (str, optional): Type of 3D box of this dataset.
......@@ -52,220 +48,107 @@ class KittiDataset(Custom3DDataset):
filter invalid predicted boxes.
Default: [0, -40, -3, 70.4, 40, 0.0].
"""
CLASSES = ('car', 'pedestrian', 'cyclist')
# TODO: use full classes of kitti
METAINFO = {'CLASSES': ('Pedestrian', 'Cyclist', 'Car')}
def __init__(self,
data_root,
ann_file,
split,
pts_prefix='velodyne',
pipeline=None,
classes=None,
modality=None,
box_type_3d='LiDAR',
filter_empty_gt=True,
test_mode=False,
pcd_limit_range=[0, -40, -3, 70.4, 40, 0.0],
data_root: str,
ann_file: str,
pipeline: List[Union[dict, Callable]] = [],
modality: Optional[dict] = None,
box_type_3d: str = 'LiDAR',
filter_empty_gt: bool = True,
test_mode: bool = False,
pcd_limit_range: List[float] = [0, -40, -3, 70.4, 40, 0.0],
**kwargs):
self.pcd_limit_range = pcd_limit_range
super().__init__(
data_root=data_root,
ann_file=ann_file,
pipeline=pipeline,
classes=classes,
modality=modality,
box_type_3d=box_type_3d,
filter_empty_gt=filter_empty_gt,
test_mode=test_mode,
**kwargs)
self.split = split
self.root_split = os.path.join(self.data_root, split)
assert self.modality is not None
self.pcd_limit_range = pcd_limit_range
self.pts_prefix = pts_prefix
assert box_type_3d.lower() in ('lidar', 'camera')
def _get_pts_filename(self, idx):
"""Get point cloud filename according to the given index.
Args:
index (int): Index of the point cloud file to get.
Returns:
str: Name of the point cloud file.
"""
pts_filename = osp.join(self.root_split, self.pts_prefix,
f'{idx:06d}.bin')
return pts_filename
def parse_data_info(self, info: dict) -> dict:
"""Process the raw data info.
def get_data_info(self, index):
"""Get data info according to the given index.
The only difference with it in `Det3DDataset`
is the specific process for `plane`.
Args:
index (int): Index of the sample data to get.
info (dict): Raw info dict.
Returns:
dict: Data information that will be passed to the data
preprocessing pipelines. It includes the following keys:
- sample_idx (str): Sample index.
- pts_filename (str): Filename of point clouds.
- img_prefix (str): Prefix of image files.
- img_info (dict): Image info.
- lidar2img (list[np.ndarray], optional): Transformations
from lidar to different cameras.
- ann_info (dict): Annotation info.
dict: Has `ann_info` in training stage. And
all path has been converted to absolute path.
"""
info = self.data_infos[index]
sample_idx = info['image']['image_idx']
img_filename = os.path.join(self.data_root,
info['image']['image_path'])
# TODO: consider use torch.Tensor only
rect = info['calib']['R0_rect'].astype(np.float32)
Trv2c = info['calib']['Tr_velo_to_cam'].astype(np.float32)
P2 = info['calib']['P2'].astype(np.float32)
lidar2img = P2 @ rect @ Trv2c
if self.modality['use_lidar']:
if 'plane' in info:
# convert ground plane to velodyne coordinates
plane = np.array(info['plane'])
lidar2cam = np.array(info['lidar_points']['lidar2cam'])
reverse = np.linalg.inv(lidar2cam)
(plane_norm_cam, plane_off_cam) = (plane[:3],
-plane[:3] * plane[3])
plane_norm_lidar = \
(reverse[:3, :3] @ plane_norm_cam[:, None])[:, 0]
plane_off_lidar = (
reverse[:3, :3] @ plane_off_cam[:, None][:, 0] +
reverse[:3, 3])
plane_lidar = np.zeros_like(plane_norm_lidar, shape=(4, ))
plane_lidar[:3] = plane_norm_lidar
plane_lidar[3] = -plane_norm_lidar.T @ plane_off_lidar
else:
plane_lidar = None
pts_filename = self._get_pts_filename(sample_idx)
input_dict = dict(
sample_idx=sample_idx,
pts_filename=pts_filename,
img_prefix=None,
img_info=dict(filename=img_filename),
lidar2img=lidar2img)
info['plane'] = plane_lidar
if not self.test_mode:
annos = self.get_ann_info(index)
input_dict['ann_info'] = annos
info = super().parse_data_info(info)
return input_dict
return info
def get_ann_info(self, index):
def parse_ann_info(self, info):
"""Get annotation info according to the given index.
Args:
index (int): Index of the annotation data to get.
info (dict): Data information of single data sample.
Returns:
dict: annotation information consists of the following keys:
- gt_bboxes_3d (:obj:`LiDARInstance3DBoxes`):
- bboxes_3d (:obj:`LiDARInstance3DBoxes`):
3D ground truth bboxes.
- gt_labels_3d (np.ndarray): Labels of ground truths.
- bbox_labels_3d (np.ndarray): Labels of ground truths.
- gt_bboxes (np.ndarray): 2D ground truth bboxes.
- gt_labels (np.ndarray): Labels of ground truths.
- gt_names (list[str]): Class names of ground truths.
- difficulty (int): Difficulty defined by KITTI.
0, 1, 2 represent xxxxx respectively.
"""
# Use index to get the annos, thus the evalhook could also use this api
info = self.data_infos[index]
rect = info['calib']['R0_rect'].astype(np.float32)
Trv2c = info['calib']['Tr_velo_to_cam'].astype(np.float32)
if 'plane' in info:
# convert ground plane to velodyne coordinates
reverse = np.linalg.inv(rect @ Trv2c)
(plane_norm_cam,
plane_off_cam) = (info['plane'][:3],
-info['plane'][:3] * info['plane'][3])
plane_norm_lidar = \
(reverse[:3, :3] @ plane_norm_cam[:, None])[:, 0]
plane_off_lidar = (
reverse[:3, :3] @ plane_off_cam[:, None][:, 0] +
reverse[:3, 3])
plane_lidar = np.zeros_like(plane_norm_lidar, shape=(4, ))
plane_lidar[:3] = plane_norm_lidar
plane_lidar[3] = -plane_norm_lidar.T @ plane_off_lidar
else:
plane_lidar = None
difficulty = info['annos']['difficulty']
annos = info['annos']
# we need other objects to avoid collision when sample
annos = self.remove_dontcare(annos)
loc = annos['location']
dims = annos['dimensions']
rots = annos['rotation_y']
gt_names = annos['name']
gt_bboxes_3d = np.concatenate([loc, dims, rots[..., np.newaxis]],
axis=1).astype(np.float32)
# convert gt_bboxes_3d to velodyne coordinates
gt_bboxes_3d = CameraInstance3DBoxes(gt_bboxes_3d).convert_to(
self.box_mode_3d, np.linalg.inv(rect @ Trv2c))
gt_bboxes = annos['bbox']
selected = self.drop_arrays_by_name(gt_names, ['DontCare'])
gt_bboxes = gt_bboxes[selected].astype('float32')
gt_names = gt_names[selected]
gt_labels = []
for cat in gt_names:
if cat in self.CLASSES:
gt_labels.append(self.CLASSES.index(cat))
else:
gt_labels.append(-1)
gt_labels = np.array(gt_labels).astype(np.int64)
gt_labels_3d = copy.deepcopy(gt_labels)
anns_results = dict(
gt_bboxes_3d=gt_bboxes_3d,
gt_labels_3d=gt_labels_3d,
bboxes=gt_bboxes,
labels=gt_labels,
gt_names=gt_names,
plane=plane_lidar,
difficulty=difficulty)
return anns_results
def drop_arrays_by_name(self, gt_names, used_classes):
"""Drop irrelevant ground truths by name.
ann_info = super().parse_ann_info(info)
Args:
gt_names (list[str]): Names of ground truths.
used_classes (list[str]): Classes of interest.
bbox_labels_3d = ann_info['gt_labels_3d']
bbox_labels_3d = np.array(bbox_labels_3d)
ann_info['gt_labels_3d'] = bbox_labels_3d
ann_info['gt_labels'] = copy.deepcopy(ann_info['gt_labels_3d'])
ann_info = self._remove_dontcare(ann_info)
Returns:
np.ndarray: Indices of ground truths that will be dropped.
"""
inds = [i for i, x in enumerate(gt_names) if x not in used_classes]
inds = np.array(inds, dtype=np.int64)
return inds
# in kitti, lidar2cam = R0_rect @ Tr_velo_to_cam
lidar2cam = np.array(info['images']['CAM2']['lidar2cam'])
# convert gt_bboxes_3d to velodyne coordinates with `lidar2cam`
gt_bboxes_3d = CameraInstance3DBoxes(
ann_info['gt_bboxes_3d']).convert_to(self.box_mode_3d,
np.linalg.inv(lidar2cam))
ann_info['gt_bboxes_3d'] = gt_bboxes_3d
def keep_arrays_by_name(self, gt_names, used_classes):
"""Keep useful ground truths by name.
Args:
gt_names (list[str]): Names of ground truths.
used_classes (list[str]): Classes of interest.
Returns:
np.ndarray: Indices of ground truths that will be keeped.
"""
inds = [i for i, x in enumerate(gt_names) if x in used_classes]
inds = np.array(inds, dtype=np.int64)
return inds
def remove_dontcare(self, ann_info):
"""Remove annotations that do not need to be cared.
Args:
ann_info (dict): Dict of annotation infos. The ``'DontCare'``
annotations will be removed according to ann_file['name'].
Returns:
dict: Annotations after filtering.
"""
img_filtered_annotations = {}
relevant_annotation_indices = [
i for i, x in enumerate(ann_info['name']) if x != 'DontCare'
]
for key in ann_info.keys():
img_filtered_annotations[key] = (
ann_info[key][relevant_annotation_indices])
return img_filtered_annotations
return ann_info
def format_results(self,
outputs,
......
......@@ -14,12 +14,12 @@ from mmdet3d.core.evaluation.lyft_eval import lyft_eval
from mmdet3d.registry import DATASETS
from ..core import show_result
from ..core.bbox import Box3DMode, Coord3DMode, LiDARInstance3DBoxes
from .custom_3d import Custom3DDataset
from .det3d_dataset import Det3DDataset
from .pipelines import Compose
@DATASETS.register_module()
class LyftDataset(Custom3DDataset):
class LyftDataset(Det3DDataset):
r"""Lyft Dataset.
This class serves as the API for experiments on the Lyft Dataset.
......
......@@ -10,12 +10,12 @@ from nuscenes.utils.data_classes import Box as NuScenesBox
from mmdet3d.registry import DATASETS
from ..core import show_result
from ..core.bbox import Box3DMode, Coord3DMode, LiDARInstance3DBoxes
from .custom_3d import Custom3DDataset
from .det3d_dataset import Det3DDataset
from .pipelines import Compose
@DATASETS.register_module()
class NuScenesDataset(Custom3DDataset):
class NuScenesDataset(Det3DDataset):
r"""NuScenes Dataset.
This class serves as the API for experiments on the NuScenes Dataset.
......
......@@ -6,13 +6,13 @@ import numpy as np
from mmdet3d.core import show_seg_result
from mmdet3d.core.bbox import DepthInstance3DBoxes
from mmdet3d.registry import DATASETS
from .custom_3d import Custom3DDataset
from .custom_3d_seg import Custom3DSegDataset
from .det3d_dataset import Det3DDataset
from .pipelines import Compose
@DATASETS.register_module()
class S3DISDataset(Custom3DDataset):
class S3DISDataset(Det3DDataset):
r"""S3DIS Dataset for Detection Task.
This class is the inner dataset for S3DIS. Since S3DIS has 6 areas, we
......
......@@ -8,13 +8,13 @@ import numpy as np
from mmdet3d.core import instance_seg_eval, show_result, show_seg_result
from mmdet3d.core.bbox import DepthInstance3DBoxes
from mmdet3d.registry import DATASETS
from .custom_3d import Custom3DDataset
from .custom_3d_seg import Custom3DSegDataset
from .det3d_dataset import Det3DDataset
from .pipelines import Compose
@DATASETS.register_module()
class ScanNetDataset(Custom3DDataset):
class ScanNetDataset(Det3DDataset):
r"""ScanNet Dataset for Detection Task.
This class serves as the API for experiments on the ScanNet Dataset.
......
......@@ -2,11 +2,11 @@
from os import path as osp
from mmdet3d.registry import DATASETS
from .custom_3d import Custom3DDataset
from .det3d_dataset import Det3DDataset
@DATASETS.register_module()
class SemanticKITTIDataset(Custom3DDataset):
class SemanticKITTIDataset(Det3DDataset):
r"""SemanticKITTI Dataset.
This class serves as the API for experiments on the SemanticKITTI Dataset
......
......@@ -8,12 +8,12 @@ from mmdet3d.core import show_multi_modality_result, show_result
from mmdet3d.core.bbox import DepthInstance3DBoxes
from mmdet3d.registry import DATASETS
from mmdet.core import eval_map
from .custom_3d import Custom3DDataset
from .det3d_dataset import Det3DDataset
from .pipelines import Compose
@DATASETS.register_module()
class SUNRGBDDataset(Custom3DDataset):
class SUNRGBDDataset(Det3DDataset):
r"""SUNRGBD Dataset.
This class serves as the API for experiments on the SUNRGBD Dataset.
......
# Copyright (c) OpenMMLab. All rights reserved.
import numpy as np
from mmdet3d.core import LiDARInstance3DBoxes
from mmdet3d.datasets import KittiDataset
def _generate_kitti_dataset_config():
data_root = 'tests/data/kitti'
ann_file = 'kitti_infos_train.pkl'
classes = ['Pedestrian', 'Cyclist', 'Car']
# wait for pipline refactor
pipeline = [
dict(
type='LoadPointsFromFile',
coord_type='LIDAR',
load_dim=4,
use_dim=4,
file_client_args=dict(backend='disk')),
dict(
type='MultiScaleFlipAug3D',
img_scale=(1333, 800),
pts_scale_ratio=1,
flip=False,
transforms=[
dict(
type='GlobalRotScaleTrans',
rot_range=[0, 0],
scale_ratio_range=[1.0, 1.0],
translation_std=[0, 0, 0]),
dict(type='RandomFlip3D'),
dict(
type='PointsRangeFilter',
point_cloud_range=[0, -40, -3, 70.4, 40, 1]),
dict(
type='DefaultFormatBundle3D',
class_names=classes,
with_label=False),
dict(type='Collect3D', keys=['points'])
])
]
modality = dict(use_lidar=True, use_camera=False)
data_prefix = dict(pts='training/velodyne_reduced', img='training/image_2')
return data_root, ann_file, classes, data_prefix, pipeline, modality
def test_getitem():
np.random.seed(0)
data_root, ann_file, classes, data_prefix, \
_, modality, = _generate_kitti_dataset_config()
modality['use_camera'] = True
from mmcv.transforms.base import BaseTransform
from mmengine.registry import TRANSFORMS
@TRANSFORMS.register_module()
class Identity(BaseTransform):
def transform(self, info):
if 'ann_info' in info:
info['gt_labels_3d'] = info['ann_info']['gt_labels_3d']
return info
pipeline = [
dict(type='Identity'),
]
kitti_dataset = KittiDataset(
data_root,
ann_file,
data_prefix=dict(
pts='training/velodyne_reduced',
img='training/image_2',
),
pipeline=pipeline,
metainfo=dict(CLASSES=classes),
modality=modality)
kitti_dataset.prepare_data(0)
input_dict = kitti_dataset.get_data_info(0)
kitti_dataset[0]
# assert the the path should contains data_prefix and data_root
assert data_prefix['pts'] in input_dict['lidar_points']['lidar_path']
assert data_root in input_dict['lidar_points']['lidar_path']
for cam_id, img_info in input_dict['images'].items():
if 'img_path' in img_info:
assert data_prefix['img'] in img_info['img_path']
assert data_root in img_info['img_path']
ann_info = kitti_dataset.parse_ann_info(input_dict)
# assert the keys in ann_info and the type
assert 'gt_labels' in ann_info
assert ann_info['gt_labels'].dtype == np.int64
# only one instance
assert len(ann_info['gt_labels']) == 1
assert 'gt_labels_3d' in ann_info
assert ann_info['gt_labels_3d'].dtype == np.int64
assert 'gt_bboxes' in ann_info
assert ann_info['gt_bboxes'].dtype == np.float64
assert 'gt_bboxes_3d' in ann_info
assert isinstance(ann_info['gt_bboxes_3d'], LiDARInstance3DBoxes)
assert 'group_id' in ann_info
assert ann_info['group_id'].dtype == np.int64
assert 'occluded' in ann_info
assert ann_info['occluded'].dtype == np.int64
assert 'difficulty' in ann_info
assert ann_info['difficulty'].dtype == np.int64
assert 'num_lidar_pts' in ann_info
assert ann_info['num_lidar_pts'].dtype == np.int64
assert 'truncated' in ann_info
assert ann_info['truncated'].dtype == np.int64
car_kitti_dataset = KittiDataset(
data_root,
ann_file,
data_prefix=dict(
pts='training/velodyne_reduced',
img='training/image_2',
),
pipeline=pipeline,
metainfo=dict(CLASSES=['Car']),
modality=modality)
input_dict = car_kitti_dataset.get_data_info(0)
ann_info = car_kitti_dataset.parse_ann_info(input_dict)
# assert the keys in ann_info and the type
assert 'gt_labels' in ann_info
assert ann_info['gt_labels'].dtype == np.int64
# all instance have been filtered by classes
assert len(ann_info['gt_labels']) == 0
assert len(car_kitti_dataset.metainfo['CLASSES']) == 1
# Copyright (c) OpenMMLab. All rights reserved.
# Copyright (c) OpenMMLab. All rights reserved.
"""Convert the annotation pkl to the standard format in OpenMMLab V2.0.
Example:
python tools/data_converter/update_infos_to_v2.py
--pkl ./data/kitti/kitti_infos_train.pkl
--out-dir ./kitti_v2/
"""
import argparse
import copy
import time
from os import path as osp
import mmcv
import numpy as np
def get_empty_instance():
"""Empty annotation for single instance."""
instance = dict(
# (list[float], required): list of 4 numbers representing
# the bounding box of the instance, in (x1, y1, x2, y2) order.
bbox=None,
# (int, required): an integer in the range
# [0, num_categories-1] representing the category label.
bbox_label=None,
# (list[float], optional): list of 7 (or 9) numbers representing
# the 3D bounding box of the instance,
# in [x, y, z, w, h, l, yaw]
# (or [x, y, z, w, h, l, yaw, vx, vy]) order.
bbox_3d=None,
# (bool, optional): Whether to use the
# 3D bounding box during training.
bbox_3d_isvalid=None,
# (int, optional): 3D category label
# (typically the same as label).
bbox_label_3d=None,
# (float, optional): Projected center depth of the
# 3D bounding box compared to the image plane.
depth=None,
# (list[float], optional): Projected
# 2D center of the 3D bounding box.
center_2d=None,
# (int, optional): Attribute labels
# (fine-grained labels such as stopping, moving, ignore, crowd).
attr_label=None,
# (int, optional): The number of LiDAR
# points in the 3D bounding box.
num_lidar_pts=None,
# (int, optional): The number of Radar
# points in the 3D bounding box.
num_radar_pts=None,
# (int, optional): Difficulty level of
# detecting the 3D bounding box.
difficulty=None,
unaligned_bbox_3d=None)
return instance
def get_empty_lidar_points():
lidar_points = dict(
# (int, optional) : Number of features for each point.
num_pts_feats=None,
# (str, optional): Path of LiDAR data file.
lidar_path=None,
# (list[list[float]]): Transformation matrix from lidar
# or depth to image with shape [4, 4].
lidar2img=None,
# (list[list[float]], optional): Transformation matrix
# from lidar to ego-vehicle
# with shape [4, 4].
# (Referenced camera coordinate system is ego in KITTI.)
lidar2ego=None,
)
return lidar_points
def get_empty_radar_points():
radar_points = dict(
# (int, optional) : Number of features for each point.
num_pts_feats=None,
# (str, optional): Path of RADAR data file.
radar_path=None,
# Transformation matrix from lidar to
# ego-vehicle with shape [4, 4].
# (Referenced camera coordinate system is ego in KITTI.)
radar2ego=None,
)
return radar_points
def get_empty_img_info():
img_info = dict(
# (str, required): the path to the image file.
img_path=None,
# (int) The height of the image.
height=None,
# (int) The width of the image.
width=None,
# (str, optional): Path of the depth map file
depth_map=None,
# (list[list[float]], optional) : Transformation
# matrix from camera to image with
# shape [3, 3], [3, 4] or [4, 4].
cam2img=None,
# (list[list[float]], optional) : Transformation
# matrix from camera to ego-vehicle
# with shape [4, 4].
cam2ego=None)
return img_info
def get_single_image_sweep():
single_image_sweep = dict(
# (float, optional) : Timestamp of the current frame.
timestamp=None,
# (list[list[float]], optional) : Transformation matrix
# from ego-vehicle to the global
ego2global=None,
# (dict): Information of images captured by multiple cameras
images=dict(
CAM0=get_empty_img_info(),
CAM1=get_empty_img_info(),
CAM2=get_empty_img_info(),
CAM3=get_empty_img_info(),
))
return single_image_sweep
def get_single_lidar_sweep():
single_lidar_sweep = dict(
# (float, optional) : Timestamp of the current frame.
timestamp=None,
# (list[list[float]], optional) : Transformation matrix
# from ego-vehicle to the global
ego2global=None,
# (dict): Information of images captured by multiple cameras
lidar_points=get_empty_lidar_points())
return single_lidar_sweep
def get_empty_standard_data_info():
data_info = dict(
# (str): Sample id of the frame.
sample_id=None,
# (str, optional): '000010'
token=None,
**get_single_image_sweep(),
# (dict, optional): dict contains information
# of LiDAR point cloud frame.
lidar_points=get_empty_lidar_points(),
# (dict, optional) Each dict contains
# information of Radar point cloud frame.
radar_points=get_empty_radar_points(),
# (list[dict], optional): Image sweeps data.
image_sweeps=[],
instances=[],
# (list[dict], optional): Required by object
# detection, instance to be ignored during training.
instances_ignore=[],
# (str, optional): Path of semantic labels for each point.
pts_semantic_mask_path=None,
# (str, optional): Path of instance labels for each point.
pts_instance_mask_path=None)
return data_info
def clear_instance_unused_keys(instance):
keys = list(instance.keys())
for k in keys:
if instance[k] is None:
del instance[k]
return instance
def clear_data_info_unused_keys(data_info):
keys = list(data_info.keys())
empty_flag = True
for key in keys:
# we allow no annotations in datainfo
if key == 'instances':
empty_flag = False
continue
if isinstance(data_info[key], list):
if len(data_info[key]) == 0:
del data_info[key]
else:
empty_flag = False
elif data_info[key] is None:
del data_info[key]
elif isinstance(data_info[key], dict):
_, sub_empty_flag = clear_data_info_unused_keys(data_info[key])
if sub_empty_flag is False:
empty_flag = False
else:
# sub field is empty
del data_info[key]
else:
empty_flag = False
return data_info, empty_flag
def update_kitti_infos(pkl_path, out_dir):
print(f'{pkl_path} will be modified.')
if out_dir in pkl_path:
print(f'Warning, you may overwriting '
f'the original data {pkl_path}.')
time.sleep(5)
# TODO update to full label
# TODO discuss how to process 'Van', 'DontCare'
METAINFO = {
'CLASSES': ('Pedestrian', 'Cyclist', 'Car'),
}
print(f'Reading from input file: {pkl_path}.')
data_list = mmcv.load(pkl_path)
print('Start updating:')
converted_list = []
for ori_info_dict in mmcv.track_iter_progress(data_list):
temp_data_info = get_empty_standard_data_info()
if 'plane' in ori_info_dict:
temp_data_info['plane'] = ori_info_dict['plane']
temp_data_info['sample_id'] = ori_info_dict['image']['image_idx']
temp_data_info['images']['CAM0']['cam2img'] = ori_info_dict['calib'][
'P0'].tolist()
temp_data_info['images']['CAM1']['cam2img'] = ori_info_dict['calib'][
'P1'].tolist()
temp_data_info['images']['CAM2']['cam2img'] = ori_info_dict['calib'][
'P2'].tolist()
temp_data_info['images']['CAM3']['cam2img'] = ori_info_dict['calib'][
'P3'].tolist()
temp_data_info['images']['CAM2']['img_path'] = ori_info_dict['image'][
'image_path'].split('/')[-1]
h, w = ori_info_dict['image']['image_shape']
temp_data_info['images']['CAM2']['height'] = h
temp_data_info['images']['CAM2']['width'] = w
temp_data_info['lidar_points']['num_pts_feats'] = ori_info_dict[
'point_cloud']['num_features']
temp_data_info['lidar_points']['lidar_path'] = ori_info_dict[
'point_cloud']['velodyne_path'].split('/')[-1]
rect = ori_info_dict['calib']['R0_rect'].astype(np.float32)
Trv2c = ori_info_dict['calib']['Tr_velo_to_cam'].astype(np.float32)
lidar2cam = rect @ Trv2c
temp_data_info['images']['CAM2']['lidar2cam'] = lidar2cam.tolist()
temp_data_info['lidar_points']['Tr_velo_to_cam'] = Trv2c.tolist()
# for potential usage
temp_data_info['images']['R0_rect'] = ori_info_dict['calib'][
'R0_rect'].astype(np.float32).tolist()
temp_data_info['lidar_points']['Tr_imu_to_velo'] = ori_info_dict[
'calib']['Tr_imu_to_velo'].astype(np.float32).tolist()
anns = ori_info_dict['annos']
num_instances = len(anns['name'])
ignore_class_name = set()
instance_list = []
for instance_id in range(num_instances):
empty_instance = get_empty_instance()
empty_instance['bbox'] = anns['bbox'][instance_id].tolist()
if anns['name'][instance_id] in METAINFO['CLASSES']:
empty_instance['bbox_label'] = METAINFO['CLASSES'].index(
anns['name'][instance_id])
else:
ignore_class_name.add(anns['name'][instance_id])
empty_instance['bbox_label'] = -1
empty_instance['bbox'] = anns['bbox'][instance_id].tolist()
loc = anns['location'][instance_id]
dims = anns['dimensions'][instance_id]
rots = anns['rotation_y'][:, None][instance_id]
gt_bboxes_3d = np.concatenate([loc, dims,
rots]).astype(np.float32).tolist()
empty_instance['bbox_3d'] = gt_bboxes_3d
empty_instance['bbox_label_3d'] = copy.deepcopy(
empty_instance['bbox_label'])
empty_instance['bbox'] = anns['bbox'][instance_id].tolist()
empty_instance['truncated'] = int(
anns['truncated'][instance_id].tolist())
empty_instance['occluded'] = anns['occluded'][instance_id].tolist()
empty_instance['alpha'] = anns['alpha'][instance_id].tolist()
empty_instance['score'] = anns['score'][instance_id].tolist()
empty_instance['index'] = anns['index'][instance_id].tolist()
empty_instance['group_id'] = anns['group_ids'][instance_id].tolist(
)
empty_instance['difficulty'] = anns['difficulty'][
instance_id].tolist()
empty_instance['num_lidar_pts'] = anns['num_points_in_gt'][
instance_id].tolist()
empty_instance = clear_instance_unused_keys(empty_instance)
instance_list.append(empty_instance)
temp_data_info['instances'] = instance_list
temp_data_info, _ = clear_data_info_unused_keys(temp_data_info)
converted_list.append(temp_data_info)
pkl_name = pkl_path.split('/')[-1]
out_path = osp.join(out_dir, pkl_name)
print(f'Writing to output file: {out_path}.')
print(f'ignore classes: {ignore_class_name}')
converted_data_info = dict(
metainfo={'DATASET': 'KITTI'}, data_list=converted_list)
mmcv.dump(converted_data_info, out_path, 'pkl')
def parse_args():
parser = argparse.ArgumentParser(description='Arg parser for data coords '
'update due to coords sys refactor.')
parser.add_argument(
'--dataset', type=str, default='kitti', help='name of dataset')
parser.add_argument(
'--pkl',
type=str,
default='./data/kitti/kitti_infos_train.pkl ',
help='specify the root dir of dataset')
parser.add_argument(
'--out-dir',
type=str,
default='converted_annotations',
required=False,
help='output direction of info pkl')
args = parser.parse_args()
return args
def main():
args = parse_args()
if args.out_dir is None:
args.out_dir = args.root_dir
if args.dataset == 'kitti':
update_kitti_infos(pkl_path=args.pkl, out_dir=args.out_dir)
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment