Commit 43243598 authored by liyinhao's avatar liyinhao
Browse files

finish indoor dataset, scannet dataset and sunrgbd dataset

parent faa0a6c4
from mmdet.datasets.builder import DATASETS
from .builder import build_dataset
from .dataset_wrappers import RepeatFactorDataset
from .indoor_dataset import IndoorDataset
from .kitti2d_dataset import Kitti2DDataset
from .kitti_dataset import KittiDataset
from .loader import DistributedGroupSampler, GroupSampler, build_dataloader
......@@ -23,5 +24,5 @@ __all__ = [
'IndoorLoadPointsFromFile', 'IndoorPointsColorNormalize',
'IndoorPointSample', 'IndoorLoadAnnotations3D', 'IndoorPointsColorJitter',
'IndoorGlobalRotScale', 'IndoorFlipData', 'SunrgbdDataset',
'ScannetDataset'
'ScannetDataset', 'IndoorDataset'
]
import copy
import mmcv
import numpy as np
import torch.utils.data as torch_data
from mmdet.datasets import DATASETS
from .pipelines import Compose
@DATASETS.register_module()
class IndoorDataset(torch_data.Dataset):
def __init__(self,
root_path,
ann_file,
pipeline=None,
training=False,
class_names=None,
test_mode=False,
with_label=True):
super().__init__()
self.root_path = root_path
self.class_names = class_names if class_names else self.CLASSES
self.test_mode = test_mode
self.training = training
self.mode = 'TRAIN' if self.training else 'TEST'
mmcv.check_file_exist(ann_file)
self.infos = mmcv.load(ann_file)
# dataset config
self.num_class = len(self.class_names)
self.pcd_limit_range = [0, -40, -3.0, 70.4, 40, 3.0]
if pipeline is not None:
self.pipeline = Compose(pipeline)
self.with_label = with_label
def __getitem__(self, idx):
if self.test_mode:
return self._prepare_test_data(idx)
while True:
data = self._prepare_train_data(idx)
if data is None:
idx = self._rand_another(idx)
continue
return data
def _prepare_test_data(self, index):
input_dict = self._get_sensor_data(index)
example = self.pipeline(input_dict)
return example
def _prepare_train_data(self, index):
input_dict = self._get_sensor_data(index)
input_dict = self._train_pre_pipeline(input_dict)
if input_dict is None:
return None
example = self.pipeline(input_dict)
if len(example['gt_bboxes_3d']._data) == 0:
return None
return example
def _train_pre_pipeline(self, input_dict):
if len(input_dict['gt_bboxes_3d']) == 0:
return None
return input_dict
def _get_sensor_data(self, index):
info = self.infos[index]
sample_idx = info['point_cloud']['lidar_idx']
pts_filename = self._get_pts_filename(sample_idx)
input_dict = dict(pts_filename=pts_filename)
if self.with_label:
annos = self._get_ann_info(index, sample_idx)
input_dict.update(annos)
return input_dict
def _rand_another(self, idx):
pool = np.where(self.flag == self.flag[idx])[0]
return np.random.choice(pool)
def _generate_annotations(self, output):
"""Generate Annotations.
Transform results of the model to the form of the evaluation.
Args:
output (List): The output of the model.
"""
result = []
bs = len(output)
for i in range(bs):
pred_list_i = list()
pred_boxes = output[i]
box3d_depth = pred_boxes['box3d_lidar']
if box3d_depth is not None:
label_preds = pred_boxes['label_preds']
scores = pred_boxes['scores'].detach().cpu().numpy()
label_preds = label_preds.detach().cpu().numpy()
num_proposal = box3d_depth.shape[0]
for j in range(num_proposal):
bbox_lidar = box3d_depth[j] # [7] in lidar
bbox_lidar_bottom = bbox_lidar.copy()
pred_list_i.append(
(label_preds[j], bbox_lidar_bottom, scores[j]))
result.append(pred_list_i)
else:
result.append(pred_list_i)
return result
def format_results(self, outputs):
results = []
for output in outputs:
result = self._generate_annotations(output)
results.append(result)
return results
def evaluate(self, results, metric=None):
"""Evaluate.
Evaluation in indoor protocol.
Args:
results (List): List of result.
metric (List[float]): AP IoU thresholds.
"""
results = self.format_results(results)
from mmdet3d.core.evaluation import indoor_eval
assert len(metric) > 0
gt_annos = [copy.deepcopy(info['annos']) for info in self.infos]
ap_result_str, ap_dict = indoor_eval(gt_annos, results, metric,
self.class2type)
return ap_dict
def __len__(self):
return len(self.infos)
import copy
import os
import os.path as osp
import mmcv
import numpy as np
import torch.utils.data as torch_data
from mmdet.datasets import DATASETS
from .pipelines import Compose
from .indoor_dataset import IndoorDataset
@DATASETS.register_module()
class ScannetDataset(torch_data.Dataset):
class ScannetDataset(IndoorDataset):
class2type = {
0: 'cabinet',
1: 'bed',
......@@ -46,82 +42,19 @@ class ScannetDataset(torch_data.Dataset):
class_names=None,
test_mode=False,
with_label=True):
super().__init__()
self.root_path = root_path
self.class_names = class_names if class_names else self.CLASSES
super().__init__(root_path, ann_file, pipeline, training, class_names,
test_mode, with_label)
self.data_path = osp.join(root_path, 'scannet_train_instance_data')
self.test_mode = test_mode
self.training = training
self.mode = 'TRAIN' if self.training else 'TEST'
mmcv.check_file_exist(ann_file)
self.scannet_infos = mmcv.load(ann_file)
# dataset config
self.num_class = len(self.class_names)
self.pcd_limit_range = [0, -40, -3.0, 70.4, 40, 3.0]
self.nyu40ids = np.array(
[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 16, 24, 28, 33, 34, 36, 39])
self.nyu40id2class = {
nyu40id: i
for i, nyu40id in enumerate(list(self.nyu40ids))
}
if pipeline is not None:
self.pipeline = Compose(pipeline)
self.with_label = with_label
def __getitem__(self, idx):
if self.test_mode:
return self._prepare_test_data(idx)
while True:
data = self._prepare_train_data(idx)
if data is None:
idx = self._rand_another(idx)
continue
return data
def _prepare_test_data(self, index):
input_dict = self._get_sensor_data(index)
example = self.pipeline(input_dict)
return example
def _prepare_train_data(self, index):
input_dict = self._get_sensor_data(index)
input_dict = self._train_pre_pipeline(input_dict)
if input_dict is None:
return None
example = self.pipeline(input_dict)
if len(example['gt_bboxes_3d']._data) == 0:
return None
return example
def _train_pre_pipeline(self, input_dict):
if len(input_dict['gt_bboxes_3d']) == 0:
return None
return input_dict
def _get_sensor_data(self, index):
info = self.scannet_infos[index]
sample_idx = info['point_cloud']['lidar_idx']
pts_filename = self._get_pts_filename(sample_idx)
input_dict = dict(pts_filename=pts_filename)
if self.with_label:
annos = self._get_ann_info(index, sample_idx)
input_dict.update(annos)
return input_dict
def _get_pts_filename(self, sample_idx):
pts_filename = os.path.join(self.data_path, f'{sample_idx}_vert.npy')
pts_filename = osp.join(self.data_path, f'{sample_idx}_vert.npy')
mmcv.check_file_exist(pts_filename)
return pts_filename
def _get_ann_info(self, index, sample_idx):
# Use index to get the annos, thus the evalhook could also use this api
info = self.scannet_infos[index]
info = self.infos[index]
if info['annos']['gt_num'] != 0:
gt_bboxes_3d = info['annos']['gt_boxes_upright_depth'] # k, 6
gt_labels = info['annos']['class']
......@@ -142,66 +75,3 @@ class ScannetDataset(torch_data.Dataset):
pts_instance_mask_path=pts_instance_mask_path,
pts_semantic_mask_path=pts_semantic_mask_path)
return anns_results
def _rand_another(self, idx):
pool = np.where(self.flag == self.flag[idx])[0]
return np.random.choice(pool)
def _generate_annotations(self, output):
"""Generate Annotations.
Transform results of the model to the form of the evaluation.
Args:
output (List): The output of the model.
"""
result = []
bs = len(output)
for i in range(bs):
pred_list_i = list()
pred_boxes = output[i]
box3d_depth = pred_boxes['box3d_lidar']
if box3d_depth is not None:
label_preds = pred_boxes['label_preds']
scores = pred_boxes['scores'].detach().cpu().numpy()
label_preds = label_preds.detach().cpu().numpy()
num_proposal = box3d_depth.shape[0]
for j in range(num_proposal):
bbox_lidar = box3d_depth[j] # [7] in lidar
bbox_lidar_bottom = bbox_lidar.copy()
pred_list_i.append(
(label_preds[j], bbox_lidar_bottom, scores[j]))
result.append(pred_list_i)
else:
result.append(pred_list_i)
return result
def format_results(self, outputs):
results = []
for output in outputs:
result = self._generate_annotations(output)
results.append(result)
return results
def evaluate(self, results, metric=None):
"""Evaluate.
Evaluation in indoor protocol.
Args:
results (List): List of result.
metric (List[float]): AP IoU thresholds.
"""
results = self.format_results(results)
from mmdet3d.core.evaluation import indoor_eval
assert len(metric) > 0
gt_annos = [
copy.deepcopy(info['annos']) for info in self.scannet_infos
]
ap_result_str, ap_dict = indoor_eval(gt_annos, results, metric,
self.class2type)
return ap_dict
def __len__(self):
return len(self.scannet_infos)
import copy
import os
import os.path as osp
import mmcv
import numpy as np
import torch.utils.data as torch_data
from mmdet.datasets import DATASETS
from .pipelines import Compose
from .indoor_dataset import IndoorDataset
@DATASETS.register_module()
class SunrgbdDataset(torch_data.Dataset):
class SunrgbdDataset(IndoorDataset):
class2type = {
0: 'bed',
......@@ -36,78 +33,19 @@ class SunrgbdDataset(torch_data.Dataset):
class_names=None,
test_mode=False,
with_label=True):
super().__init__()
self.root_path = root_path
self.class_names = class_names if class_names else self.CLASSES
super().__init__(root_path, ann_file, pipeline, training, class_names,
test_mode, with_label)
self.data_path = osp.join(root_path, 'sunrgbd_trainval')
self.test_mode = test_mode
self.training = training
self.mode = 'TRAIN' if self.training else 'TEST'
mmcv.check_file_exist(ann_file)
self.sunrgbd_infos = mmcv.load(ann_file)
# dataset config
self.num_class = len(self.class_names)
self.pcd_limit_range = [0, -40, -3.0, 70.4, 40, 3.0]
if pipeline is not None:
self.pipeline = Compose(pipeline)
self.with_label = with_label
def __getitem__(self, idx):
if self.test_mode:
return self._prepare_test_data(idx)
while True:
data = self._prepare_train_data(idx)
if data is None:
idx = self._rand_another(idx)
continue
return data
def _prepare_test_data(self, index):
input_dict = self._get_sensor_data(index)
example = self.pipeline(input_dict)
return example
def _prepare_train_data(self, index):
input_dict = self._get_sensor_data(index)
input_dict = self._train_pre_pipeline(input_dict)
if input_dict is None:
return None
example = self.pipeline(input_dict)
if len(example['gt_bboxes_3d']._data) == 0:
return None
return example
def _train_pre_pipeline(self, input_dict):
if len(input_dict['gt_bboxes_3d']) == 0:
return None
return input_dict
def _get_sensor_data(self, index):
info = self.sunrgbd_infos[index]
sample_idx = info['point_cloud']['lidar_idx']
pts_filename = self._get_pts_filename(sample_idx)
input_dict = dict(pts_filename=pts_filename)
if self.with_label:
annos = self._get_ann_info(index, sample_idx)
input_dict.update(annos)
return input_dict
def _get_pts_filename(self, sample_idx):
pts_filename = os.path.join(self.data_path, 'lidar',
f'{sample_idx:06d}.npy')
pts_filename = osp.join(self.data_path, 'lidar',
f'{sample_idx:06d}.npy')
mmcv.check_file_exist(pts_filename)
return pts_filename
def _get_ann_info(self, index, sample_idx):
# Use index to get the annos, thus the evalhook could also use this api
info = self.sunrgbd_infos[index]
info = self.infos[index]
if info['annos']['gt_num'] != 0:
gt_bboxes_3d = info['annos']['gt_boxes_upright_depth'] # k, 6
gt_labels = info['annos']['class']
......@@ -122,66 +60,3 @@ class SunrgbdDataset(torch_data.Dataset):
gt_labels=gt_labels,
gt_bboxes_3d_mask=gt_bboxes_3d_mask)
return anns_results
def _rand_another(self, idx):
pool = np.where(self.flag == self.flag[idx])[0]
return np.random.choice(pool)
def _generate_annotations(self, output):
"""Generate Annotations.
Transform results of the model to the form of the evaluation.
Args:
output (List): The output of the model.
"""
result = []
bs = len(output)
for i in range(bs):
pred_list_i = list()
pred_boxes = output[i]
box3d_depth = pred_boxes['box3d_lidar']
if box3d_depth is not None:
label_preds = pred_boxes['label_preds']
scores = pred_boxes['scores'].detach().cpu().numpy()
label_preds = label_preds.detach().cpu().numpy()
num_proposal = box3d_depth.shape[0]
for j in range(num_proposal):
bbox_lidar = box3d_depth[j] # [7] in lidar
bbox_lidar_bottom = bbox_lidar.copy()
pred_list_i.append(
(label_preds[j], bbox_lidar_bottom, scores[j]))
result.append(pred_list_i)
else:
result.append(pred_list_i)
return result
def format_results(self, outputs):
results = []
for output in outputs:
result = self._generate_annotations(output)
results.append(result)
return results
def evaluate(self, results, metric):
"""Evaluate.
Evaluation in indoor protocol.
Args:
results (List): List of result.
metric (List[float]): AP IoU thresholds.
"""
results = self.format_results(results)
from mmdet3d.core.evaluation import indoor_eval
assert len(metric) > 0
gt_annos = [
copy.deepcopy(info['annos']) for info in self.sunrgbd_infos
]
ap_result_str, ap_dict = indoor_eval(gt_annos, results, metric,
self.class2type)
return ap_dict
def __len__(self):
return len(self.sunrgbd_infos)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment