Commit afa6adf1 authored by jihanyang's avatar jihanyang
Browse files

support lyft dataset

parent eed89a4d
...@@ -9,13 +9,15 @@ from .kitti.kitti_dataset import KittiDataset ...@@ -9,13 +9,15 @@ from .kitti.kitti_dataset import KittiDataset
from .nuscenes.nuscenes_dataset import NuScenesDataset from .nuscenes.nuscenes_dataset import NuScenesDataset
from .waymo.waymo_dataset import WaymoDataset from .waymo.waymo_dataset import WaymoDataset
from .pandaset.pandaset_dataset import PandasetDataset from .pandaset.pandaset_dataset import PandasetDataset
from .lyft.lyft_dataset import LyftDataset
__all__ = { __all__ = {
'DatasetTemplate': DatasetTemplate, 'DatasetTemplate': DatasetTemplate,
'KittiDataset': KittiDataset, 'KittiDataset': KittiDataset,
'NuScenesDataset': NuScenesDataset, 'NuScenesDataset': NuScenesDataset,
'WaymoDataset': WaymoDataset, 'WaymoDataset': WaymoDataset,
'PandasetDataset': PandasetDataset 'PandasetDataset': PandasetDataset,
'LyftDataset': LyftDataset
} }
......
...@@ -147,6 +147,7 @@ class DatasetTemplate(torch_data.Dataset): ...@@ -147,6 +147,7 @@ class DatasetTemplate(torch_data.Dataset):
max_sweeps = self.dataset_cfg.get('MAX_SWEEPS', 1) max_sweeps = self.dataset_cfg.get('MAX_SWEEPS', 1)
idx = self.dataset_cfg.POINT_FEATURE_ENCODING.get('src_feature_list').index('timestamp') idx = self.dataset_cfg.POINT_FEATURE_ENCODING.get('src_feature_list').index('timestamp')
dt = np.round(data_dict['points'][:, idx], 2) dt = np.round(data_dict['points'][:, idx], 2)
if np.unique(dt).shape[0] == max_sweeps:
max_dt = sorted(np.unique(dt))[max_sweeps-1] max_dt = sorted(np.unique(dt))[max_sweeps-1]
data_dict['points'] = data_dict['points'][dt <= max_dt] data_dict['points'] = data_dict['points'][dt <= max_dt]
......
...@@ -12,6 +12,11 @@ def transform_annotations_to_kitti_format(annos, map_name_to_kitti=None, info_wi ...@@ -12,6 +12,11 @@ def transform_annotations_to_kitti_format(annos, map_name_to_kitti=None, info_wi
""" """
for anno in annos: for anno in annos:
# For lyft and nuscenes, different anno key in info
if 'name' not in anno:
anno['name'] = anno['gt_names']
anno.pop('gt_names')
for k in range(anno['name'].shape[0]): for k in range(anno['name'].shape[0]):
anno['name'][k] = map_name_to_kitti[anno['name'][k]] anno['name'][k] = map_name_to_kitti[anno['name'][k]]
......
import copy
import pickle
from pathlib import Path
import numpy as np
from tqdm import tqdm
from ...ops.roiaware_pool3d import roiaware_pool3d_utils
from ...utils import common_utils, box_utils
from ..dataset import DatasetTemplate
class LyftDataset(DatasetTemplate):
def __init__(self, dataset_cfg, class_names, training=True, root_path=None, logger=None):
self.root_path = (root_path if root_path is not None else Path(dataset_cfg.DATA_PATH)) / dataset_cfg.VERSION
super().__init__(
dataset_cfg=dataset_cfg, class_names=class_names, training=training, root_path=self.root_path, logger=logger
)
self.infos = []
self.include_lyft_data(self.mode)
def include_lyft_data(self, mode):
self.logger.info('Loading lyft dataset')
lyft_infos = []
for info_path in self.dataset_cfg.INFO_PATH[mode]:
info_path = self.root_path / info_path
if not info_path.exists():
continue
with open(info_path, 'rb') as f:
infos = pickle.load(f)
lyft_infos.extend(infos)
self.infos.extend(lyft_infos)
self.logger.info('Total samples for lyft dataset: %d' % (len(lyft_infos)))
@staticmethod
def remove_ego_points(points, center_radius=1.0):
mask = ~((np.abs(points[:, 0]) < center_radius*1.5) & (np.abs(points[:, 1]) < center_radius))
return points[mask]
def get_sweep(self, sweep_info):
lidar_path = self.root_path / sweep_info['lidar_path']
points_sweep = np.fromfile(str(lidar_path), dtype=np.float32, count=-1)
if points_sweep.shape[0] % 5 != 0:
points_sweep = points_sweep[: points_sweep.shape[0] - (points_sweep.shape[0] % 5)]
points_sweep = points_sweep.reshape([-1, 5])[:, :4]
points_sweep = self.remove_ego_points(points_sweep).T
if sweep_info['transform_matrix'] is not None:
num_points = points_sweep.shape[1]
points_sweep[:3, :] = sweep_info['transform_matrix'].dot(
np.vstack((points_sweep[:3, :], np.ones(num_points))))[:3, :]
cur_times = sweep_info['time_lag'] * np.ones((1, points_sweep.shape[1]))
return points_sweep.T, cur_times.T
def get_lidar_with_sweeps(self, index, max_sweeps=1):
info = self.infos[index]
lidar_path = self.root_path / info['lidar_path']
points = np.fromfile(str(lidar_path), dtype=np.float32, count=-1)
if points.shape[0] % 5 != 0:
points = points[: points.shape[0] - (points.shape[0] % 5)]
points = points.reshape([-1, 5])[:, :4]
sweep_points_list = [points]
sweep_times_list = [np.zeros((points.shape[0], 1))]
for k in np.random.choice(len(info['sweeps']), max_sweeps - 1, replace=False):
points_sweep, times_sweep = self.get_sweep(info['sweeps'][k])
sweep_points_list.append(points_sweep)
sweep_times_list.append(times_sweep)
points = np.concatenate(sweep_points_list, axis=0)
times = np.concatenate(sweep_times_list, axis=0).astype(points.dtype)
points = np.concatenate((points, times), axis=1)
return points
def __len__(self):
if self._merge_all_iters_to_one_epoch:
return len(self.infos) * self.total_epochs
return len(self.infos)
def __getitem__(self, index):
if self._merge_all_iters_to_one_epoch:
index = index % len(self.infos)
info = copy.deepcopy(self.infos[index])
points = self.get_lidar_with_sweeps(index, max_sweeps=self.dataset_cfg.MAX_SWEEPS)
input_dict = {
'points': points,
'frame_id': Path(info['lidar_path']).stem,
'metadata': {'token': info['token']}
}
if 'gt_boxes' in info:
input_dict.update({
'gt_boxes': info['gt_boxes'],
'gt_names': info['gt_names']
})
data_dict = self.prepare_data(data_dict=input_dict)
return data_dict
def generate_prediction_dicts(self, batch_dict, pred_dicts, class_names, output_path=None):
"""
Args:
batch_dict:
frame_id:
pred_dicts: list of pred_dicts
pred_boxes: (N, 7), Tensor
pred_scores: (N), Tensor
pred_labels: (N), Tensor
class_names:
output_path:
Returns:
"""
def get_template_prediction(num_samples):
ret_dict = {
'name': np.zeros(num_samples), 'score': np.zeros(num_samples),
'boxes_lidar': np.zeros([num_samples, 7]), 'pred_labels': np.zeros(num_samples)
}
return ret_dict
def generate_single_sample_dict(box_dict):
pred_scores = box_dict['pred_scores'].cpu().numpy()
pred_boxes = box_dict['pred_boxes'].cpu().numpy()
pred_labels = box_dict['pred_labels'].cpu().numpy()
pred_dict = get_template_prediction(pred_scores.shape[0])
if pred_scores.shape[0] == 0:
return pred_dict
pred_dict['name'] = np.array(class_names)[pred_labels - 1]
pred_dict['score'] = pred_scores
pred_dict['boxes_lidar'] = pred_boxes
pred_dict['pred_labels'] = pred_labels
return pred_dict
annos = []
for index, box_dict in enumerate(pred_dicts):
single_pred_dict = generate_single_sample_dict(box_dict)
single_pred_dict['frame_id'] = batch_dict['frame_id'][index]
single_pred_dict['metadata'] = batch_dict['metadata'][index]
annos.append(single_pred_dict)
return annos
def kitti_eval(self, eval_det_annos, eval_gt_annos, class_names):
from ..kitti.kitti_object_eval_python import eval as kitti_eval
from ..kitti import kitti_utils
map_name_to_kitti = {
'car': 'Car',
'pedestrian': 'Pedestrian',
'truck': 'Truck',
'bicycle': 'Cyclist',
'motorcycle': 'Cyclist'
}
kitti_utils.transform_to_kitti_format(eval_det_annos, map_name_to_kitti=map_name_to_kitti)
kitti_utils.transform_to_kitti_format(
eval_gt_annos, map_name_to_kitti=map_name_to_kitti,
info_with_fakelidar=self.dataset_cfg.get('INFO_WITH_FAKELIDAR', False)
)
kitti_class_names = [map_name_to_kitti[x] for x in class_names]
ap_result_str, ap_dict = kitti_eval.get_official_eval_result(
gt_annos=eval_gt_annos, dt_annos=eval_det_annos, current_classes=kitti_class_names
)
return ap_result_str, ap_dict
def evaluation(self, det_annos, class_names, **kwargs):
if kwargs['eval_metric'] == 'kitti':
eval_det_annos = copy.deepcopy(det_annos)
eval_gt_annos = copy.deepcopy(self.infos)
return self.kitti_eval(eval_det_annos, eval_gt_annos, class_names)
elif kwargs['eval_metric'] == 'lyft':
return self.lyft_eval(det_annos, class_names,
iou_thresholds=self.dataset_cfg.EVAL_LYFT_IOU_LIST)
else:
raise NotImplementedError
def lyft_eval(self, det_annos, class_names, iou_thresholds=[0.5]):
from lyft_dataset_sdk.lyftdataset import LyftDataset as Lyft
from . import lyft_utils
# from lyft_dataset_sdk.eval.detection.mAP_evaluation import get_average_precisions
from .lyft_mAP_eval.lyft_eval import get_average_precisions
lyft = Lyft(json_path=self.root_path / 'data', data_path=self.root_path, verbose=True)
det_lyft_boxes, sample_tokens = lyft_utils.convert_det_to_lyft_format(lyft, det_annos)
gt_lyft_boxes = lyft_utils.load_lyft_gt_by_tokens(lyft, sample_tokens)
average_precisions = get_average_precisions(gt_lyft_boxes, det_lyft_boxes, class_names, iou_thresholds)
ap_result_str, ap_dict = lyft_utils.format_lyft_results(average_precisions, class_names, iou_thresholds, version=self.dataset_cfg.VERSION)
return ap_result_str, ap_dict
def create_groundtruth_database(self, used_classes=None, max_sweeps=10):
import torch
database_save_path = self.root_path / f'gt_database'
db_info_save_path = self.root_path / f'lyft_dbinfos_{max_sweeps}sweeps.pkl'
database_save_path.mkdir(parents=True, exist_ok=True)
all_db_infos = {}
for idx in tqdm(range(len(self.infos))):
sample_idx = idx
info = self.infos[idx]
points = self.get_lidar_with_sweeps(idx, max_sweeps=max_sweeps)
gt_boxes = info['gt_boxes']
gt_names = info['gt_names']
box_idxs_of_pts = roiaware_pool3d_utils.points_in_boxes_gpu(
torch.from_numpy(points[:, 0:3]).unsqueeze(dim=0).float().cuda(),
torch.from_numpy(gt_boxes[:, 0:7]).unsqueeze(dim=0).float().cuda()
).long().squeeze(dim=0).cpu().numpy()
for i in range(gt_boxes.shape[0]):
filename = '%s_%s_%d.bin' % (sample_idx, gt_names[i], i)
filepath = database_save_path / filename
gt_points = points[box_idxs_of_pts == i]
gt_points[:, :3] -= gt_boxes[i, :3]
with open(filepath, 'w') as f:
gt_points.tofile(f)
if (used_classes is None) or gt_names[i] in used_classes:
db_path = str(filepath.relative_to(self.root_path)) # gt_database/xxxxx.bin
db_info = {'name': gt_names[i], 'path': db_path, 'image_idx': sample_idx, 'gt_idx': i,
'box3d_lidar': gt_boxes[i], 'num_points_in_gt': gt_points.shape[0]}
if gt_names[i] in all_db_infos:
all_db_infos[gt_names[i]].append(db_info)
else:
all_db_infos[gt_names[i]] = [db_info]
for k, v in all_db_infos.items():
print('Database %s: %d' % (k, len(v)))
with open(db_info_save_path, 'wb') as f:
pickle.dump(all_db_infos, f)
def create_lyft_info(version, data_path, save_path, split, max_sweeps=10):
from lyft_dataset_sdk.lyftdataset import LyftDataset
from . import lyft_utils
data_path = data_path / version
save_path = save_path / version
split_path = data_path.parent / 'ImageSets'
if split is not None:
save_path = save_path / split
split_path = split_path / split
save_path.mkdir(exist_ok=True)
assert version in ['trainval', 'one_scene', 'test']
if version == 'trainval':
train_split_path = split_path / 'train.txt'
val_split_path = split_path / 'val.txt'
elif version == 'test':
train_split_path = split_path / 'test.txt'
val_split_path = None
elif version == 'one_scene':
train_split_path = split_path / 'one_scene.txt'
val_split_path = split_path / 'one_scene.txt'
else:
raise NotImplementedError
train_scenes = [x.strip() for x in open(train_split_path).readlines()] if train_split_path.exists() else []
val_scenes = [x.strip() for x in open(val_split_path).readlines()] if val_split_path.exists() else []
lyft = LyftDataset(json_path=data_path / 'data', data_path=data_path, verbose=True)
available_scenes = lyft_utils.get_available_scenes(lyft)
available_scene_names = [s['name'] for s in available_scenes]
train_scenes = list(filter(lambda x: x in available_scene_names, train_scenes))
val_scenes = list(filter(lambda x: x in available_scene_names, val_scenes))
train_scenes = set([available_scenes[available_scene_names.index(s)]['token'] for s in train_scenes])
val_scenes = set([available_scenes[available_scene_names.index(s)]['token'] for s in val_scenes])
print('%s: train scene(%d), val scene(%d)' % (version, len(train_scenes), len(val_scenes)))
train_lyft_infos, val_lyft_infos = lyft_utils.fill_trainval_infos(
data_path=data_path, lyft=lyft, train_scenes=train_scenes, val_scenes=val_scenes,
test='test' in version, max_sweeps=max_sweeps
)
if version == 'test':
print('test sample: %d' % len(train_lyft_infos))
with open(save_path / f'lyft_infos_test.pkl', 'wb') as f:
pickle.dump(train_lyft_infos, f)
else:
print('train sample: %d, val sample: %d' % (len(train_lyft_infos), len(val_lyft_infos)))
with open(save_path / f'lyft_infos_train.pkl', 'wb') as f:
pickle.dump(train_lyft_infos, f)
with open(save_path / f'lyft_infos_val.pkl', 'wb') as f:
pickle.dump(val_lyft_infos, f)
if __name__ == '__main__':
import yaml
import argparse
from pathlib import Path
from easydict import EasyDict
parser = argparse.ArgumentParser(description='arg parser')
parser.add_argument('--cfg_file', type=str, default=None, help='specify the config of dataset')
parser.add_argument('--func', type=str, default='create_lyft_infos', help='')
parser.add_argument('--version', type=str, default='trainval', help='')
parser.add_argument('--split', type=str, default=None, help='')
parser.add_argument('--max_sweeps', type=int, default=10, help='')
args = parser.parse_args()
if args.func == 'create_lyft_infos':
try:
yaml_config = yaml.safe_load(open(args.cfg_file), Loader=yaml.FullLoader)
except:
yaml_config = yaml.safe_load(open(args.cfg_file))
dataset_cfg = EasyDict(yaml_config)
ROOT_DIR = (Path(__file__).resolve().parent / '../../../').resolve()
dataset_cfg.VERSION = args.version
dataset_cfg.MAX_SWEEPS = args.max_sweeps
create_lyft_info(
version=dataset_cfg.VERSION,
data_path=ROOT_DIR / 'data' / 'lyft',
save_path=ROOT_DIR / 'data' / 'lyft',
split=args.split,
max_sweeps=dataset_cfg.MAX_SWEEPS
)
lyft_dataset = LyftDataset(
dataset_cfg=dataset_cfg, class_names=None,
root_path=ROOT_DIR / 'data' / 'lyft',
logger=common_utils.create_logger(), training=True
)
lyft_dataset.create_groundtruth_database(max_sweeps=dataset_cfg.MAX_SWEEPS)
"""
modified from lyft toolkit https://github.com/lyft/nuscenes-devkit.git
"""
"""
mAP 3D calculation for the data in nuScenes format.
The intput files expected to have the format:
Expected fields:
gt = [{
'sample_token': '0f0e3ce89d2324d8b45aa55a7b4f8207fbb039a550991a5149214f98cec136ac',
'translation': [974.2811881299899, 1714.6815014457964, -23.689857123368846],
'size': [1.796, 4.488, 1.664],
'rotation': [0.14882026466054782, 0, 0, 0.9888642620837121],
'name': 'car'
}]
prediction_result = {
'sample_token': '0f0e3ce89d2324d8b45aa55a7b4f8207fbb039a550991a5149214f98cec136ac',
'translation': [971.8343488872263, 1713.6816097857359, -25.82534357061308],
'size': [2.519726579986132, 7.810161372666739, 3.483438286096803],
'rotation': [0.10913582721095375, 0.04099572636992043, 0.01927712319721745, 1.029328402625659],
'name': 'car',
'score': 0.3077029437237213
}
input arguments:
--pred_file: file with predictions
--gt_file: ground truth file
--iou_threshold: IOU threshold
In general we would be interested in average of mAP at thresholds [0.5, 0.55, 0.6, 0.65,...0.95], similar to the
standard COCO => one needs to run this file N times for every IOU threshold independently.
"""
import argparse
import json
from collections import defaultdict
from pathlib import Path
import numpy as np
from pyquaternion import Quaternion
from shapely.geometry import Polygon
class Box3D:
"""Data class used during detection evaluation. Can be a prediction or ground truth."""
def __init__(self, **kwargs):
sample_token = kwargs["sample_token"]
translation = kwargs["translation"]
size = kwargs["size"]
rotation = kwargs["rotation"]
name = kwargs["name"]
score = kwargs.get("score", -1)
if not isinstance(sample_token, str):
raise TypeError("Sample_token must be a string!")
if not len(translation) == 3:
raise ValueError("Translation must have 3 elements!")
if np.any(np.isnan(translation)):
raise ValueError("Translation may not be NaN!")
if not len(size) == 3:
raise ValueError("Size must have 3 elements!")
if np.any(np.isnan(size)):
raise ValueError("Size may not be NaN!")
if not len(rotation) == 4:
raise ValueError("Rotation must have 4 elements!")
if np.any(np.isnan(rotation)):
raise ValueError("Rotation may not be NaN!")
if name is None:
raise ValueError("Name cannot be empty!")
# Assign.
self.sample_token = sample_token
self.translation = translation
self.size = size
self.volume = np.prod(self.size)
self.score = score
assert np.all([x > 0 for x in size])
self.rotation = rotation
self.name = name
self.quaternion = Quaternion(self.rotation)
self.width, self.length, self.height = size
self.center_x, self.center_y, self.center_z = self.translation
self.min_z = self.center_z - self.height / 2
self.max_z = self.center_z + self.height / 2
self.ground_bbox_coords = None
self.ground_bbox_coords = self.get_ground_bbox_coords()
@staticmethod
def check_orthogonal(a, b, c):
"""Check that vector (b - a) is orthogonal to the vector (c - a)."""
return np.isclose((b[0] - a[0]) * (c[0] - a[0]) + (b[1] - a[1]) * (c[1] - a[1]), 0)
def get_ground_bbox_coords(self):
if self.ground_bbox_coords is not None:
return self.ground_bbox_coords
return self.calculate_ground_bbox_coords()
def calculate_ground_bbox_coords(self):
"""We assume that the 3D box has lower plane parallel to the ground.
Returns: Polygon with 4 points describing the base.
"""
if self.ground_bbox_coords is not None:
return self.ground_bbox_coords
rotation_matrix = self.quaternion.rotation_matrix
cos_angle = rotation_matrix[0, 0]
sin_angle = rotation_matrix[1, 0]
point_0_x = self.center_x + self.length / 2 * cos_angle + self.width / 2 * sin_angle
point_0_y = self.center_y + self.length / 2 * sin_angle - self.width / 2 * cos_angle
point_1_x = self.center_x + self.length / 2 * cos_angle - self.width / 2 * sin_angle
point_1_y = self.center_y + self.length / 2 * sin_angle + self.width / 2 * cos_angle
point_2_x = self.center_x - self.length / 2 * cos_angle - self.width / 2 * sin_angle
point_2_y = self.center_y - self.length / 2 * sin_angle + self.width / 2 * cos_angle
point_3_x = self.center_x - self.length / 2 * cos_angle + self.width / 2 * sin_angle
point_3_y = self.center_y - self.length / 2 * sin_angle - self.width / 2 * cos_angle
point_0 = point_0_x, point_0_y
point_1 = point_1_x, point_1_y
point_2 = point_2_x, point_2_y
point_3 = point_3_x, point_3_y
assert self.check_orthogonal(point_0, point_1, point_3)
assert self.check_orthogonal(point_1, point_0, point_2)
assert self.check_orthogonal(point_2, point_1, point_3)
assert self.check_orthogonal(point_3, point_0, point_2)
self.ground_bbox_coords = Polygon(
[
(point_0_x, point_0_y),
(point_1_x, point_1_y),
(point_2_x, point_2_y),
(point_3_x, point_3_y),
(point_0_x, point_0_y),
]
)
return self.ground_bbox_coords
def get_height_intersection(self, other):
min_z = max(other.min_z, self.min_z)
max_z = min(other.max_z, self.max_z)
return max(0, max_z - min_z)
def get_area_intersection(self, other) -> float:
result = self.ground_bbox_coords.intersection(other.ground_bbox_coords).area
assert result <= self.width * self.length
return result
def get_intersection(self, other) -> float:
height_intersection = self.get_height_intersection(other)
area_intersection = self.ground_bbox_coords.intersection(other.ground_bbox_coords).area
return height_intersection * area_intersection
def get_iou(self, other):
intersection = self.get_intersection(other)
union = self.volume + other.volume - intersection
iou = np.clip(intersection / union, 0, 1)
return iou
def __repr__(self):
return str(self.serialize())
def serialize(self) -> dict:
"""Returns: Serialized instance as dict."""
return {
"sample_token": self.sample_token,
"translation": self.translation,
"size": self.size,
"rotation": self.rotation,
"name": self.name,
"volume": self.volume,
"score": self.score,
}
def group_by_key(detections, key):
groups = defaultdict(list)
for detection in detections:
groups[detection[key]].append(detection)
return groups
def wrap_in_box(input):
result = {}
for key, value in input.items():
result[key] = [Box3D(**x) for x in value]
return result
def get_envelope(precisions):
"""Compute the precision envelope.
Args:
precisions:
Returns:
"""
for i in range(precisions.size - 1, 0, -1):
precisions[i - 1] = np.maximum(precisions[i - 1], precisions[i])
return precisions
def get_ap(recalls, precisions):
"""Calculate average precision.
Args:
recalls:
precisions: Returns (float): average precision.
Returns:
"""
# correct AP calculation
# first append sentinel values at the end
recalls = np.concatenate(([0.0], recalls, [1.0]))
precisions = np.concatenate(([0.0], precisions, [0.0]))
precisions = get_envelope(precisions)
# to calculate area under PR curve, look for points where X axis (recall) changes value
i = np.where(recalls[1:] != recalls[:-1])[0]
# and sum (\Delta recall) * prec
ap = np.sum((recalls[i + 1] - recalls[i]) * precisions[i + 1])
return ap
def get_ious(gt_boxes, predicted_box):
return [predicted_box.get_iou(x) for x in gt_boxes]
def recall_precision(gt, predictions, iou_threshold_list):
num_gts = len(gt)
if num_gts == 0:
return -1, -1, -1
image_gts = group_by_key(gt, "sample_token")
image_gts = wrap_in_box(image_gts)
sample_gt_checked = {sample_token: np.zeros((len(boxes), len(iou_threshold_list))) for sample_token, boxes in image_gts.items()}
predictions = sorted(predictions, key=lambda x: x["score"], reverse=True)
# go down dets and mark TPs and FPs
num_predictions = len(predictions)
tp = np.zeros((num_predictions, len(iou_threshold_list)))
fp = np.zeros((num_predictions, len(iou_threshold_list)))
for prediction_index, prediction in enumerate(predictions):
predicted_box = Box3D(**prediction)
sample_token = prediction["sample_token"]
max_overlap = -np.inf
jmax = -1
try:
gt_boxes = image_gts[sample_token] # gt_boxes per sample
gt_checked = sample_gt_checked[sample_token] # gt flags per sample
except KeyError:
gt_boxes = []
gt_checked = None
if len(gt_boxes) > 0:
overlaps = get_ious(gt_boxes, predicted_box)
max_overlap = np.max(overlaps)
jmax = np.argmax(overlaps)
for i, iou_threshold in enumerate(iou_threshold_list):
if max_overlap > iou_threshold:
if gt_checked[jmax, i] == 0:
tp[prediction_index, i] = 1.0
gt_checked[jmax, i] = 1
else:
fp[prediction_index, i] = 1.0
else:
fp[prediction_index, i] = 1.0
# compute precision recall
fp = np.cumsum(fp, axis=0)
tp = np.cumsum(tp, axis=0)
recalls = tp / float(num_gts)
assert np.all(0 <= recalls) & np.all(recalls <= 1)
# avoid divide by zero in case the first detection matches a difficult ground truth
precisions = tp / np.maximum(tp + fp, np.finfo(np.float64).eps)
assert np.all(0 <= precisions) & np.all(precisions <= 1)
ap_list = []
for i in range(len(iou_threshold_list)):
recall = recalls[:, i]
precision = precisions[:, i]
ap = get_ap(recall, precision)
ap_list.append(ap)
return recalls, precisions, ap_list
def get_average_precisions(gt: list, predictions: list, class_names: list, iou_thresholds: list) -> np.array:
"""Returns an array with an average precision per class.
Args:
gt: list of dictionaries in the format described below.
predictions: list of dictionaries in the format described below.
class_names: list of the class names.
iou_threshold: list of IOU thresholds used to calculate TP / FN
Returns an array with an average precision per class.
Ground truth and predictions should have schema:
gt = [{
'sample_token': '0f0e3ce89d2324d8b45aa55a7b4f8207fbb039a550991a5149214f98cec136ac',
'translation': [974.2811881299899, 1714.6815014457964, -23.689857123368846],
'size': [1.796, 4.488, 1.664],
'rotation': [0.14882026466054782, 0, 0, 0.9888642620837121],
'name': 'car'
}]
predictions = [{
'sample_token': '0f0e3ce89d2324d8b45aa55a7b4f8207fbb039a550991a5149214f98cec136ac',
'translation': [971.8343488872263, 1713.6816097857359, -25.82534357061308],
'size': [2.519726579986132, 7.810161372666739, 3.483438286096803],
'rotation': [0.10913582721095375, 0.04099572636992043, 0.01927712319721745, 1.029328402625659],
'name': 'car',
'score': 0.3077029437237213
}]
"""
assert all([0 <= iou_th <= 1 for iou_th in iou_thresholds])
gt_by_class_name = group_by_key(gt, "name")
pred_by_class_name = group_by_key(predictions, "name")
average_precisions = np.zeros(len(class_names))
for class_id, class_name in enumerate(class_names):
if class_name in pred_by_class_name:
recalls, precisions, ap_list = recall_precision(
gt_by_class_name[class_name], pred_by_class_name[class_name], iou_thresholds
)
aps = np.mean(ap_list)
average_precisions[class_id] = aps
return average_precisions
def get_class_names(gt: dict) -> list:
"""Get sorted list of class names.
Args:
gt:
Returns: Sorted list of class names.
"""
return sorted(list(set([x["name"] for x in gt])))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
arg = parser.add_argument
arg("-p", "--pred_file", type=str, help="Path to the predictions file.", required=True)
arg("-g", "--gt_file", type=str, help="Path to the ground truth file.", required=True)
arg("-t", "--iou_threshold", type=float, help="iou threshold", default=0.5)
args = parser.parse_args()
gt_path = Path(args.gt_file)
pred_path = Path(args.pred_file)
with open(args.pred_file) as f:
predictions = json.load(f)
with open(args.gt_file) as f:
gt = json.load(f)
class_names = get_class_names(gt)
print("Class_names = ", class_names)
average_precisions = get_average_precisions(gt, predictions, class_names, args.iou_threshold)
mAP = np.mean(average_precisions)
print("Average per class mean average precision = ", mAP)
for class_id in sorted(list(zip(class_names, average_precisions.flatten().tolist()))):
print(class_id)
"""
The Lyft data pre-processing and evaluation is modified from
https://github.com/poodarchu/Det3D
"""
import operator
from functools import reduce
from pathlib import Path
import numpy as np
import tqdm
from lyft_dataset_sdk.utils.data_classes import Box, Quaternion
from lyft_dataset_sdk.lyftdataset import LyftDataset
from lyft_dataset_sdk.utils.geometry_utils import transform_matrix
from lyft_dataset_sdk.eval.detection.mAP_evaluation import Box3D
def get_available_scenes(lyft):
available_scenes = []
print('total scene num:', len(lyft.scene))
for scene in lyft.scene:
scene_token = scene['token']
scene_rec = lyft.get('scene', scene_token)
sample_rec = lyft.get('sample', scene_rec['first_sample_token'])
sd_rec = lyft.get('sample_data', sample_rec['data']['LIDAR_TOP'])
has_more_frames = True
scene_not_exist = False
while has_more_frames:
lidar_path, boxes, _ = lyft.get_sample_data(sd_rec['token'])
if not Path(lidar_path).exists():
scene_not_exist = True
break
else:
break
# if not sd_rec['next'] == '':
# sd_rec = nusc.get('sample_data', sd_rec['next'])
# else:
# has_more_frames = False
if scene_not_exist:
continue
available_scenes.append(scene)
print('exist scene num:', len(available_scenes))
return available_scenes
def get_sample_data(lyft, sample_data_token):
sd_rec = lyft.get("sample_data", sample_data_token)
cs_rec = lyft.get("calibrated_sensor", sd_rec["calibrated_sensor_token"])
sensor_rec = lyft.get("sensor", cs_rec["sensor_token"])
pose_rec = lyft.get("ego_pose", sd_rec["ego_pose_token"])
boxes = lyft.get_boxes(sample_data_token)
box_list = []
for box in boxes:
box.translate(-np.array(pose_rec["translation"]))
box.rotate(Quaternion(pose_rec["rotation"]).inverse)
box.translate(-np.array(cs_rec["translation"]))
box.rotate(Quaternion(cs_rec["rotation"]).inverse)
box_list.append(box)
return box_list, pose_rec
def quaternion_yaw(q: Quaternion) -> float:
"""
Calculate the yaw angle from a quaternion.
Note that this only works for a quaternion that represents a box in lidar or global coordinate frame.
It does not work for a box in the camera frame.
:param q: Quaternion of interest.
:return: Yaw angle in radians.
"""
# Project into xy plane.
v = np.dot(q.rotation_matrix, np.array([1, 0, 0]))
# Measure yaw using arctan.
yaw = np.arctan2(v[1], v[0])
return yaw
def fill_trainval_infos(data_path, lyft, train_scenes, val_scenes, test=False, max_sweeps=10):
train_lyft_infos = []
val_lyft_infos = []
progress_bar = tqdm.tqdm(total=len(lyft.sample), desc='create_info', dynamic_ncols=True)
# ref_chans = ["LIDAR_TOP", "LIDAR_FRONT_LEFT", "LIDAR_FRONT_RIGHT"]
ref_chan = "LIDAR_TOP"
for index, sample in enumerate(lyft.sample):
progress_bar.update()
ref_info = {}
ref_sd_token = sample["data"][ref_chan]
ref_sd_rec = lyft.get("sample_data", ref_sd_token)
ref_cs_token = ref_sd_rec["calibrated_sensor_token"]
ref_cs_rec = lyft.get("calibrated_sensor", ref_cs_token)
ref_to_car = transform_matrix(
ref_cs_rec["translation"],
Quaternion(ref_cs_rec["rotation"]),
inverse=False,
)
ref_from_car = transform_matrix(
ref_cs_rec["translation"],
Quaternion(ref_cs_rec["rotation"]),
inverse=True,
)
ref_lidar_path = lyft.get_sample_data_path(ref_sd_token)
ref_boxes, ref_pose_rec = get_sample_data(lyft, ref_sd_token)
ref_time = 1e-6 * ref_sd_rec["timestamp"]
car_from_global = transform_matrix(
ref_pose_rec["translation"],
Quaternion(ref_pose_rec["rotation"]),
inverse=True,
)
car_to_global = transform_matrix(
ref_pose_rec["translation"],
Quaternion(ref_pose_rec["rotation"]),
inverse=False,
)
info = {
"lidar_path": Path(ref_lidar_path).relative_to(data_path).__str__(),
"ref_from_car": ref_from_car,
"ref_to_car": ref_to_car,
'token': sample['token'],
'car_from_global': car_from_global,
'car_to_global': car_to_global,
'timestamp': ref_time,
'sweeps': []
}
sample_data_token = sample['data'][ref_chan]
curr_sd_rec = lyft.get('sample_data', sample_data_token)
sweeps = []
while len(sweeps) < max_sweeps - 1:
if curr_sd_rec['prev'] == '':
if len(sweeps) == 0:
sweep = {
'lidar_path': Path(ref_lidar_path).relative_to(data_path).__str__(),
'sample_data_token': curr_sd_rec['token'],
'transform_matrix': None,
'time_lag': curr_sd_rec['timestamp'] * 0,
}
sweeps.append(sweep)
else:
sweeps.append(sweeps[-1])
else:
curr_sd_rec = lyft.get('sample_data', curr_sd_rec['prev'])
# Get past pose
current_pose_rec = lyft.get('ego_pose', curr_sd_rec['ego_pose_token'])
global_from_car = transform_matrix(
current_pose_rec['translation'], Quaternion(current_pose_rec['rotation']), inverse=False,
)
# Homogeneous transformation matrix from sensor coordinate frame to ego car frame.
current_cs_rec = lyft.get(
'calibrated_sensor', curr_sd_rec['calibrated_sensor_token']
)
car_from_current = transform_matrix(
current_cs_rec['translation'], Quaternion(current_cs_rec['rotation']), inverse=False,
)
tm = reduce(np.dot, [ref_from_car, car_from_global, global_from_car, car_from_current])
lidar_path = lyft.get_sample_data_path(curr_sd_rec['token'])
time_lag = ref_time - 1e-6 * curr_sd_rec['timestamp']
sweep = {
'lidar_path': Path(lidar_path).relative_to(data_path).__str__(),
'sample_data_token': curr_sd_rec['token'],
'transform_matrix': tm,
'global_from_car': global_from_car,
'car_from_current': car_from_current,
'time_lag': time_lag,
}
sweeps.append(sweep)
info['sweeps'] = sweeps
if not test:
annotations = [
lyft.get("sample_annotation", token) for token in sample["anns"]
]
locs = np.array([b.center for b in ref_boxes]).reshape(-1, 3)
dims = np.array([b.wlh for b in ref_boxes]).reshape(-1, 3)[:, [1, 0, 2]]
rots = np.array([quaternion_yaw(b.orientation) for b in ref_boxes]).reshape(
-1, 1
)
velocity = np.array([b.velocity for b in ref_boxes]).reshape(-1, 3)
names = np.array([b.name for b in ref_boxes])
tokens = np.array([b.token for b in ref_boxes]).reshape(-1, 1)
gt_boxes = np.concatenate([locs, dims, rots], axis=1)
assert len(annotations) == len(gt_boxes)
info["gt_boxes"] = gt_boxes
info["gt_boxes_velocity"] = velocity
info["gt_names"] = names
info["gt_boxes_token"] = tokens
if sample["scene_token"] in train_scenes:
train_lyft_infos.append(info)
else:
val_lyft_infos.append(info)
progress_bar.close()
return train_lyft_infos, val_lyft_infos
def boxes_lidar_to_lyft(boxes3d, scores=None, labels=None):
box_list = []
for k in range(boxes3d.shape[0]):
quat = Quaternion(axis=[0, 0, 1], radians=boxes3d[k, 6])
box = Box(
boxes3d[k, :3],
boxes3d[k, [4, 3, 5]], # wlh
quat, label=labels[k] if labels is not None else np.nan,
score=scores[k] if scores is not None else np.nan,
)
box_list.append(box)
return box_list
def lidar_lyft_box_to_global(lyft, boxes, sample_token):
s_record = lyft.get('sample', sample_token)
sample_data_token = s_record['data']['LIDAR_TOP']
sd_record = lyft.get('sample_data', sample_data_token)
cs_record = lyft.get('calibrated_sensor', sd_record['calibrated_sensor_token'])
sensor_record = lyft.get('sensor', cs_record['sensor_token'])
pose_record = lyft.get('ego_pose', sd_record['ego_pose_token'])
box_list = []
for box in boxes:
# Move box to ego vehicle coord system
box.rotate(Quaternion(cs_record['rotation']))
box.translate(np.array(cs_record['translation']))
# Move box to global coord system
box.rotate(Quaternion(pose_record['rotation']))
box.translate(np.array(pose_record['translation']))
box_list.append(box)
return box_list
def convert_det_to_lyft_format(lyft, det_annos):
sample_tokens = []
det_lyft_box = []
for anno in det_annos:
sample_tokens.append(anno['metadata']['token'])
boxes_lyft_list = boxes_lidar_to_lyft(anno['boxes_lidar'], anno['score'], anno['pred_labels'])
boxes_list = lidar_lyft_box_to_global(lyft, boxes_lyft_list, anno['metadata']['token'])
for idx, box in enumerate(boxes_list):
name = anno['name'][idx]
box3d = {
'sample_token': anno['metadata']['token'],
'translation': box.center.tolist(),
'size': box.wlh.tolist(),
'rotation': box.orientation.elements.tolist(),
'name': name,
'score': box.score
}
det_lyft_box.append(box3d)
return det_lyft_box, sample_tokens
def load_lyft_gt_by_tokens(lyft, sample_tokens):
"""
Modify from Lyft tutorial
"""
gt_box3ds = []
# Load annotations and filter predictions and annotations.
for sample_token in sample_tokens:
sample = lyft.get('sample', sample_token)
sample_annotation_tokens = sample['anns']
sample_lidar_token = sample["data"]["LIDAR_TOP"]
lidar_data = lyft.get("sample_data", sample_lidar_token)
ego_pose = lyft.get("ego_pose", lidar_data["ego_pose_token"])
ego_translation = np.array(ego_pose['translation'])
for sample_annotation_token in sample_annotation_tokens:
sample_annotation = lyft.get('sample_annotation', sample_annotation_token)
sample_annotation_translation = sample_annotation['translation']
class_name = sample_annotation['category_name']
box3d = {
'sample_token': sample_token,
'translation': sample_annotation_translation,
'size': sample_annotation['size'],
'rotation': sample_annotation['rotation'],
'name': class_name
}
gt_box3ds.append(box3d)
return gt_box3ds
def format_lyft_results(classwise_ap, class_names, iou_threshold_list, version='trainval'):
ret_dict = {}
result = '----------------Lyft %s results-----------------\n' % version
result += 'Average precision over IoUs: {}\n'.format(str(iou_threshold_list))
for c_idx, class_name in enumerate(class_names):
result += '{:<20}: \t {:.4f}\n'.format(class_name, classwise_ap[c_idx])
ret_dict[class_name] = classwise_ap[c_idx]
result += '--------------average performance-------------\n'
mAP = np.mean(classwise_ap)
result += 'mAP:\t {:.4f}\n'.format(mAP)
ret_dict['mAP'] = mAP
return result, ret_dict
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment