"docs/basic_usage/offline_engine_api.ipynb" did not exist on "fac17acf084a2800e5442ca548887f05e17946e2"
Commit afe88104 authored by lishj6's avatar lishj6 🏸
Browse files

init0905

parent a48c4071
import prettytable
from typing import Dict, List, Optional
from time import time
from copy import deepcopy
from multiprocessing import Pool
from logging import Logger
from functools import partial, cached_property
import numpy as np
from numpy.typing import NDArray
from shapely.geometry import LineString
import mmcv
from mmcv import Config
from mmdet.datasets import build_dataset, build_dataloader
from .AP import instance_match, average_precision
INTERP_NUM = 200 # number of points to interpolate during evaluation
THRESHOLDS = [0.5, 1.0, 1.5] # AP thresholds
N_WORKERS = 16 # num workers to parallel
class VectorEvaluate(object):
"""Evaluator for vectorized map.
Args:
dataset_cfg (Config): dataset cfg for gt
n_workers (int): num workers to parallel
"""
def __init__(self, dataset_cfg: Config, n_workers: int=N_WORKERS) -> None:
self.dataset = build_dataset(dataset_cfg)
self.dataloader = build_dataloader(
self.dataset, samples_per_gpu=1, workers_per_gpu=n_workers, shuffle=False, dist=False)
classes = self.dataset.MAP_CLASSES
self.cat2id = {cls: i for i, cls in enumerate(classes)}
self.id2cat = {v: k for k, v in self.cat2id.items()}
self.n_workers = n_workers
self.thresholds = [0.5, 1.0, 1.5]
@cached_property
def gts(self) -> Dict[str, Dict[int, List[NDArray]]]:
print('collecting gts...')
gts = {}
pbar = mmcv.ProgressBar(len(self.dataloader))
for data in self.dataloader:
token = deepcopy(data['img_metas'].data[0][0]['token'])
gt = deepcopy(data['vectors'].data[0][0])
gts[token] = gt
pbar.update()
del data # avoid dataloader memory crash
return gts
def interp_fixed_num(self,
vector: NDArray,
num_pts: int) -> NDArray:
''' Interpolate a polyline.
Args:
vector (array): line coordinates, shape (M, 2)
num_pts (int):
Returns:
sampled_points (array): interpolated coordinates
'''
line = LineString(vector)
distances = np.linspace(0, line.length, num_pts)
sampled_points = np.array([list(line.interpolate(distance).coords)
for distance in distances]).squeeze()
return sampled_points
def interp_fixed_dist(self,
vector: NDArray,
sample_dist: float) -> NDArray:
''' Interpolate a line at fixed interval.
Args:
vector (LineString): vector
sample_dist (float): sample interval
Returns:
points (array): interpolated points, shape (N, 2)
'''
line = LineString(vector)
distances = list(np.arange(sample_dist, line.length, sample_dist))
# make sure to sample at least two points when sample_dist > line.length
distances = [0,] + distances + [line.length,]
sampled_points = np.array([list(line.interpolate(distance).coords)
for distance in distances]).squeeze()
return sampled_points
def _evaluate_single(self,
pred_vectors: List,
scores: List,
groundtruth: List,
thresholds: List,
metric: str='metric') -> Dict[int, NDArray]:
''' Do single-frame matching for one class.
Args:
pred_vectors (List): List[vector(ndarray) (different length)],
scores (List): List[score(float)]
groundtruth (List): List of vectors
thresholds (List): List of thresholds
Returns:
tp_fp_score_by_thr (Dict): matching results at different thresholds
e.g. {0.5: (M, 2), 1.0: (M, 2), 1.5: (M, 2)}
'''
pred_lines = []
# interpolate predictions
for vector in pred_vectors:
vector = np.array(vector)
vector_interp = self.interp_fixed_num(vector, INTERP_NUM)
pred_lines.append(vector_interp)
if pred_lines:
pred_lines = np.stack(pred_lines)
else:
pred_lines = np.zeros((0, INTERP_NUM, 2))
# interpolate groundtruth
gt_lines = []
for vector in groundtruth:
vector_interp = self.interp_fixed_num(vector, INTERP_NUM)
gt_lines.append(vector_interp)
if gt_lines:
gt_lines = np.stack(gt_lines)
else:
gt_lines = np.zeros((0, INTERP_NUM, 2))
scores = np.array(scores)
tp_fp_list = instance_match(pred_lines, scores, gt_lines, thresholds, metric) # (M, 2)
tp_fp_score_by_thr = {}
for i, thr in enumerate(thresholds):
tp, fp = tp_fp_list[i]
tp_fp_score = np.hstack([tp[:, None], fp[:, None], scores[:, None]])
tp_fp_score_by_thr[thr] = tp_fp_score
return tp_fp_score_by_thr # {0.5: (M, 2), 1.0: (M, 2), 1.5: (M, 2)}
def evaluate(self,
result_path: str,
metric: str='chamfer',
logger: Optional[Logger]=None) -> Dict[str, float]:
''' Do evaluation for a submission file and print evalution results to `logger` if specified.
The submission will be aligned by tokens before evaluation. We use multi-worker to speed up.
Args:
result_path (str): path to submission file
metric (str): distance metric. Default: 'chamfer'
logger (Logger): logger to print evaluation result, Default: None
Returns:
new_result_dict (Dict): evaluation results. AP by categories.
'''
results = mmcv.load(result_path)
results = results['results']
# re-group samples and gt by label
samples_by_cls = {label: [] for label in self.id2cat.keys()}
num_gts = {label: 0 for label in self.id2cat.keys()}
num_preds = {label: 0 for label in self.id2cat.keys()}
# align by token
for token, gt in self.gts.items():
if token in results.keys():
pred = results[token]
else:
pred = {'vectors': [], 'scores': [], 'labels': []}
# for every sample
vectors_by_cls = {label: [] for label in self.id2cat.keys()}
scores_by_cls = {label: [] for label in self.id2cat.keys()}
for i in range(len(pred['labels'])):
# i-th pred line in sample
label = pred['labels'][i]
vector = pred['vectors'][i]
score = pred['scores'][i]
vectors_by_cls[label].append(vector)
scores_by_cls[label].append(score)
for label in self.id2cat.keys():
new_sample = (vectors_by_cls[label], scores_by_cls[label], gt[label])
num_gts[label] += len(gt[label])
num_preds[label] += len(scores_by_cls[label])
samples_by_cls[label].append(new_sample)
result_dict = {}
print(f'\nevaluating {len(self.id2cat)} categories...')
start = time()
if self.n_workers > 0:
pool = Pool(self.n_workers)
sum_mAP = 0
pbar = mmcv.ProgressBar(len(self.id2cat))
for label in self.id2cat.keys():
samples = samples_by_cls[label] # List[(pred_lines, scores, gts)]
result_dict[self.id2cat[label]] = {
'num_gts': num_gts[label],
'num_preds': num_preds[label]
}
sum_AP = 0
fn = partial(self._evaluate_single, thresholds=self.thresholds, metric=metric)
if self.n_workers > 0 and len(samples) > 81:
tpfp_score_list = pool.starmap(fn, samples)
else:
tpfp_score_list = []
for sample in samples:
tpfp_score_list.append(fn(*sample))
for thr in self.thresholds:
tp_fp_score = [i[thr] for i in tpfp_score_list]
tp_fp_score = np.vstack(tp_fp_score) # (num_dets, 3)
sort_inds = np.argsort(-tp_fp_score[:, -1])
tp = tp_fp_score[sort_inds, 0] # (num_dets,)
fp = tp_fp_score[sort_inds, 1] # (num_dets,)
tp = np.cumsum(tp, axis=0)
fp = np.cumsum(fp, axis=0)
eps = np.finfo(np.float32).eps
recalls = tp / np.maximum(num_gts[label], eps)
precisions = tp / np.maximum((tp + fp), eps)
AP = average_precision(recalls, precisions, 'area')
sum_AP += AP
result_dict[self.id2cat[label]].update({f'AP@{thr}': AP})
pbar.update()
AP = sum_AP / len(self.thresholds)
sum_mAP += AP
result_dict[self.id2cat[label]].update({f'AP': AP})
if self.n_workers > 0:
pool.close()
mAP = sum_mAP / len(self.id2cat.keys())
result_dict.update({'mAP': mAP})
print(f"finished in {time() - start:.2f}s")
# print results
table = prettytable.PrettyTable(['category', 'num_preds', 'num_gts'] +
[f'AP@{thr}' for thr in self.thresholds] + ['AP'])
for label in self.id2cat.keys():
table.add_row([
self.id2cat[label],
result_dict[self.id2cat[label]]['num_preds'],
result_dict[self.id2cat[label]]['num_gts'],
*[round(result_dict[self.id2cat[label]][f'AP@{thr}'], 4) for thr in self.thresholds],
round(result_dict[self.id2cat[label]]['AP'], 4),
])
from mmcv.utils import print_log
print_log('\n'+str(table), logger=logger)
mAP_normal = 0
for label in self.id2cat.keys():
for thr in self.thresholds:
mAP_normal += result_dict[self.id2cat[label]][f'AP@{thr}']
mAP_normal = mAP_normal / 9
print_log(f'mAP_normal = {mAP_normal:.4f}\n', logger=logger)
# print_log(f'mAP_hard = {mAP_easy:.4f}\n', logger=logger)
new_result_dict = {}
for name in self.cat2id:
new_result_dict[name] = result_dict[name]['AP']
new_result_dict['mAP_normal'] = mAP_normal
return new_result_dict
\ No newline at end of file
# nuScenes dev-kit.
# Code written by Holger Caesar & Oscar Beijbom, 2018.
import argparse
import json
import os
import random
import time
import tqdm
from typing import Tuple, Dict, Any
import numpy as np
from nuscenes import NuScenes
from nuscenes.eval.common.config import config_factory
from nuscenes.eval.common.data_classes import EvalBoxes
from nuscenes.eval.common.loaders import add_center_dist, filter_eval_boxes
from nuscenes.eval.detection.algo import accumulate, calc_ap, calc_tp
from nuscenes.eval.detection.constants import DETECTION_NAMES, ATTRIBUTE_NAMES, TP_METRICS
from nuscenes.eval.detection.data_classes import DetectionConfig, DetectionMetrics, DetectionBox, \
DetectionMetricDataList, DetectionMetricData
from nuscenes.eval.detection.render import summary_plot, class_pr_curve, class_tp_curve, dist_pr_curve, visualize_sample
from nuscenes.prediction import PredictHelper, convert_local_coords_to_global
from nuscenes.utils.splits import create_splits_scenes
from nuscenes.eval.detection.utils import category_to_detection_name
from nuscenes.eval.common.utils import quaternion_yaw, Quaternion
from nuscenes.eval.common.utils import center_distance, scale_iou, yaw_diff, velocity_l2, attr_acc, cummean
from .motion_utils import MotionBox, load_prediction, load_gt, accumulate
MOTION_TP_METRICS = ['min_ade_err', 'min_fde_err', 'miss_rate_err']
class MotionEval:
"""
This is the official nuScenes detection evaluation code.
Results are written to the provided output_dir.
nuScenes uses the following detection metrics:
- Mean Average Precision (mAP): Uses center-distance as matching criterion; averaged over distance thresholds.
- True Positive (TP) metrics: Average of translation, velocity, scale, orientation and attribute errors.
- nuScenes Detection Score (NDS): The weighted sum of the above.
Here is an overview of the functions in this method:
- init: Loads GT annotations and predictions stored in JSON format and filters the boxes.
- run: Performs evaluation and dumps the metric data to disk.
- render: Renders various plots and dumps to disk.
We assume that:
- Every sample_token is given in the results, although there may be not predictions for that sample.
Please see https://www.nuscenes.org/object-detection for more details.
"""
def __init__(self,
nusc: NuScenes,
config: DetectionConfig,
result_path: str,
eval_set: str,
output_dir: str = None,
verbose: bool = True,
seconds: int = 12):
"""
Initialize a DetectionEval object.
:param nusc: A NuScenes object.
:param config: A DetectionConfig object.
:param result_path: Path of the nuScenes JSON result file.
:param eval_set: The dataset split to evaluate on, e.g. train, val or test.
:param output_dir: Folder to save plots and results to.
:param verbose: Whether to print to stdout.
"""
self.nusc = nusc
self.result_path = result_path
self.eval_set = eval_set
self.output_dir = output_dir
self.verbose = verbose
self.cfg = config
# Check result file exists.
# assert os.path.exists(result_path), 'Error: The result file does not exist!'
# Make dirs.
self.plot_dir = os.path.join(self.output_dir, 'plots')
if not os.path.isdir(self.output_dir):
os.makedirs(self.output_dir)
if not os.path.isdir(self.plot_dir):
os.makedirs(self.plot_dir)
# Load data.
if verbose:
print('Initializing nuScenes detection evaluation')
self.pred_boxes, self.meta = load_prediction(self.result_path, self.cfg.max_boxes_per_sample, MotionBox,
verbose=verbose)
self.gt_boxes = load_gt(self.nusc, self.eval_set, MotionBox, verbose=verbose, seconds=seconds)
assert set(self.pred_boxes.sample_tokens) == set(self.gt_boxes.sample_tokens), \
"Samples in split doesn't match samples in predictions."
# Add center distances.
self.pred_boxes = add_center_dist(nusc, self.pred_boxes)
self.gt_boxes = add_center_dist(nusc, self.gt_boxes)
# Filter boxes (distance, points per box, etc.).
if verbose:
print('Filtering predictions')
self.pred_boxes = filter_eval_boxes(nusc, self.pred_boxes, self.cfg.class_range, verbose=verbose)
if verbose:
print('Filtering ground truth annotations')
self.gt_boxes = filter_eval_boxes(nusc, self.gt_boxes, self.cfg.class_range, verbose=verbose)
self.sample_tokens = self.gt_boxes.sample_tokens
def evaluate(self) -> Tuple[DetectionMetrics, DetectionMetricDataList]:
"""
Performs the actual evaluation.
:return: A tuple of high-level and the raw metric data.
"""
start_time = time.time()
self.cfg.class_names = ['car', 'pedestrian']
self.cfg.dist_ths = [2.0]
# -----------------------------------
# Step 1: Accumulate metric data for all classes and distance thresholds.
# -----------------------------------
if self.verbose:
print('Accumulating metric data...')
metric_data_list = DetectionMetricDataList()
metrics = {}
for class_name in self.cfg.class_names:
for dist_th in self.cfg.dist_ths:
md, EPA, EPA_ = accumulate(self.gt_boxes, self.pred_boxes, class_name, self.cfg.dist_fcn_callable, dist_th)
metric_data_list.set(class_name, dist_th, md)
metrics[f'{class_name}_EPA'] = EPA_
# -----------------------------------
# Step 2: Calculate metrics from the data.
# -----------------------------------
if self.verbose:
print('Calculating metrics...')
for class_name in self.cfg.class_names:
# Compute TP metrics.
for metric_name in MOTION_TP_METRICS:
metric_data = metric_data_list[(class_name, self.cfg.dist_th_tp)]
tp = calc_tp(metric_data, self.cfg.min_recall, metric_name)
metrics[f'{class_name}_{metric_name}'] = tp
return metrics, metric_data_list
def render(self, metrics: DetectionMetrics, md_list: DetectionMetricDataList) -> None:
"""
Renders various PR and TP curves.
:param metrics: DetectionMetrics instance.
:param md_list: DetectionMetricDataList instance.
"""
if self.verbose:
print('Rendering PR and TP curves')
def savepath(name):
return os.path.join(self.plot_dir, name + '.pdf')
summary_plot(md_list, metrics, min_precision=self.cfg.min_precision, min_recall=self.cfg.min_recall,
dist_th_tp=self.cfg.dist_th_tp, savepath=savepath('summary'))
for detection_name in self.cfg.class_names:
class_pr_curve(md_list, metrics, detection_name, self.cfg.min_precision, self.cfg.min_recall,
savepath=savepath(detection_name + '_pr'))
class_tp_curve(md_list, metrics, detection_name, self.cfg.min_recall, self.cfg.dist_th_tp,
savepath=savepath(detection_name + '_tp'))
for dist_th in self.cfg.dist_ths:
dist_pr_curve(md_list, metrics, dist_th, self.cfg.min_precision, self.cfg.min_recall,
savepath=savepath('dist_pr_' + str(dist_th)))
def main(self,
plot_examples: int = 0,
render_curves: bool = True) -> Dict[str, Any]:
"""
Main function that loads the evaluation code, visualizes samples, runs the evaluation and renders stat plots.
:param plot_examples: How many example visualizations to write to disk.
:param render_curves: Whether to render PR and TP curves to disk.
:return: A dict that stores the high-level metrics and meta data.
"""
if plot_examples > 0:
# Select a random but fixed subset to plot.
random.seed(42)
sample_tokens = list(self.sample_tokens)
random.shuffle(sample_tokens)
sample_tokens = sample_tokens[:plot_examples]
# Visualize samples.
example_dir = os.path.join(self.output_dir, 'examples')
if not os.path.isdir(example_dir):
os.mkdir(example_dir)
for sample_token in sample_tokens:
visualize_sample(self.nusc,
sample_token,
self.gt_boxes if self.eval_set != 'test' else EvalBoxes(),
# Don't render test GT.
self.pred_boxes,
eval_range=max(self.cfg.class_range.values()),
savepath=os.path.join(example_dir, '{}.png'.format(sample_token)))
# Run evaluation.
metrics, metric_data_list = self.evaluate()
return metrics
class NuScenesEval(MotionEval):
"""
Dummy class for backward-compatibility. Same as MotionEval.
"""
if __name__ == "__main__":
# Settings.
parser = argparse.ArgumentParser(description='Evaluate nuScenes detection results.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('result_path', type=str, help='The submission as a JSON file.')
parser.add_argument('--output_dir', type=str, default='~/nuscenes-metrics',
help='Folder to store result metrics, graphs and example visualizations.')
parser.add_argument('--eval_set', type=str, default='val',
help='Which dataset split to evaluate on, train, val or test.')
parser.add_argument('--dataroot', type=str, default='/data/sets/nuscenes',
help='Default nuScenes data directory.')
parser.add_argument('--version', type=str, default='v1.0-trainval',
help='Which version of the nuScenes dataset to evaluate on, e.g. v1.0-trainval.')
parser.add_argument('--config_path', type=str, default='',
help='Path to the configuration file.'
'If no path given, the CVPR 2019 configuration will be used.')
parser.add_argument('--plot_examples', type=int, default=10,
help='How many example visualizations to write to disk.')
parser.add_argument('--render_curves', type=int, default=1,
help='Whether to render PR and TP curves to disk.')
parser.add_argument('--verbose', type=int, default=1,
help='Whether to print to stdout.')
args = parser.parse_args()
result_path_ = os.path.expanduser(args.result_path)
output_dir_ = os.path.expanduser(args.output_dir)
eval_set_ = args.eval_set
dataroot_ = args.dataroot
version_ = args.version
config_path = args.config_path
plot_examples_ = args.plot_examples
render_curves_ = bool(args.render_curves)
verbose_ = bool(args.verbose)
if config_path == '':
cfg_ = config_factory('detection_cvpr_2019')
else:
with open(config_path, 'r') as _f:
cfg_ = DetectionConfig.deserialize(json.load(_f))
nusc_ = NuScenes(version=version_, verbose=verbose_, dataroot=dataroot_)
nusc_eval = DetectionEval(nusc_, config=cfg_, result_path=result_path_, eval_set=eval_set_,
output_dir=output_dir_, verbose=verbose_)
nusc_eval.main(plot_examples=plot_examples_, render_curves=render_curves_)
from tqdm import tqdm
import torch
import torch.nn as nn
import numpy as np
from shapely.geometry import Polygon
from mmcv.utils import print_log
from mmdet.datasets import build_dataset, build_dataloader
from projects.mmdet3d_plugin.datasets.utils import box3d_to_corners
def check_collision(ego_box, boxes):
'''
ego_box: tensor with shape [7], [x, y, z, w, l, h, yaw]
boxes: tensor with shape [N, 7]
'''
if boxes.shape[0] == 0:
return False
# follow uniad, add a 0.5m offset
ego_box[0] += 0.5 * torch.cos(ego_box[6])
ego_box[1] += 0.5 * torch.sin(ego_box[6])
ego_corners_box = box3d_to_corners(ego_box.unsqueeze(0))[0, [0, 3, 7, 4], :2]
corners_box = box3d_to_corners(boxes)[:, [0, 3, 7, 4], :2]
ego_poly = Polygon([(point[0], point[1]) for point in ego_corners_box])
for i in range(len(corners_box)):
box_poly = Polygon([(point[0], point[1]) for point in corners_box[i]])
collision = ego_poly.intersects(box_poly)
if collision:
return True
return False
def get_yaw(traj):
start = traj[0]
end = traj[-1]
dist = torch.linalg.norm(end - start, dim=-1)
if dist < 0.5:
return traj.new_ones(traj.shape[0]) * np.pi / 2
zeros = traj.new_zeros((1, 2))
traj_cat = torch.cat([zeros, traj], dim=0)
yaw = traj.new_zeros(traj.shape[0]+1)
yaw[..., 1:-1] = torch.atan2(
traj_cat[..., 2:, 1] - traj_cat[..., :-2, 1],
traj_cat[..., 2:, 0] - traj_cat[..., :-2, 0],
)
yaw[..., -1] = torch.atan2(
traj_cat[..., -1, 1] - traj_cat[..., -2, 1],
traj_cat[..., -1, 0] - traj_cat[..., -2, 0],
)
return yaw[1:]
class PlanningMetric():
def __init__(
self,
n_future=6,
compute_on_step: bool = False,
):
self.W = 1.85
self.H = 4.084
self.n_future = n_future
self.reset()
def reset(self):
self.obj_col = torch.zeros(self.n_future)
self.obj_box_col = torch.zeros(self.n_future)
self.L2 = torch.zeros(self.n_future)
self.total = torch.tensor(0)
def evaluate_single_coll(self, traj, fut_boxes):
n_future = traj.shape[0]
yaw = get_yaw(traj)
ego_box = traj.new_zeros((n_future, 7))
ego_box[:, :2] = traj
ego_box[:, 3:6] = ego_box.new_tensor([self.H, self.W, 1.56])
ego_box[:, 6] = yaw
collision = torch.zeros(n_future, dtype=torch.bool)
for t in range(n_future):
ego_box_t = ego_box[t].clone()
boxes = fut_boxes[t][0].clone()
collision[t] = check_collision(ego_box_t, boxes)
return collision
def evaluate_coll(self, trajs, gt_trajs, fut_boxes):
B, n_future, _ = trajs.shape
trajs = trajs * torch.tensor([-1, 1], device=trajs.device)
gt_trajs = gt_trajs * torch.tensor([-1, 1], device=gt_trajs.device)
obj_coll_sum = torch.zeros(n_future, device=trajs.device)
obj_box_coll_sum = torch.zeros(n_future, device=trajs.device)
assert B == 1, 'only supprt bs=1'
for i in range(B):
gt_box_coll = self.evaluate_single_coll(gt_trajs[i], fut_boxes)
box_coll = self.evaluate_single_coll(trajs[i], fut_boxes)
box_coll = torch.logical_and(box_coll, torch.logical_not(gt_box_coll))
obj_coll_sum += gt_box_coll.long()
obj_box_coll_sum += box_coll.long()
return obj_coll_sum, obj_box_coll_sum
def compute_L2(self, trajs, gt_trajs, gt_trajs_mask):
'''
trajs: torch.Tensor (B, n_future, 3)
gt_trajs: torch.Tensor (B, n_future, 3)
'''
return torch.sqrt((((trajs[:, :, :2] - gt_trajs[:, :, :2]) ** 2) * gt_trajs_mask).sum(dim=-1))
def update(self, trajs, gt_trajs, gt_trajs_mask, fut_boxes):
assert trajs.shape == gt_trajs.shape
trajs[..., 0] = - trajs[..., 0]
gt_trajs[..., 0] = - gt_trajs[..., 0]
L2 = self.compute_L2(trajs, gt_trajs, gt_trajs_mask)
obj_coll_sum, obj_box_coll_sum = self.evaluate_coll(trajs[:,:,:2], gt_trajs[:,:,:2], fut_boxes)
self.obj_col += obj_coll_sum
self.obj_box_col += obj_box_coll_sum
self.L2 += L2.sum(dim=0)
self.total +=len(trajs)
def compute(self):
return {
'obj_col': self.obj_col / self.total,
'obj_box_col': self.obj_box_col / self.total,
'L2' : self.L2 / self.total
}
def planning_eval(results, eval_config, logger):
dataset = build_dataset(eval_config)
dataloader = build_dataloader(
dataset, samples_per_gpu=1, workers_per_gpu=1, shuffle=False, dist=False)
planning_metrics = PlanningMetric()
for i, data in enumerate(tqdm(dataloader)):
sdc_planning = data['gt_ego_fut_trajs'].cumsum(dim=-2).unsqueeze(1)
sdc_planning_mask = data['gt_ego_fut_masks'].unsqueeze(-1).repeat(1, 1, 2).unsqueeze(1)
command = data['gt_ego_fut_cmd'].argmax(dim=-1).item()
fut_boxes = data['fut_boxes']
if not sdc_planning_mask.all(): ## for incomplete gt, we do not count this sample
continue
res = results[i]
pred_sdc_traj = res['img_bbox']['final_planning'].unsqueeze(0)
planning_metrics.update(pred_sdc_traj[:, :6, :2], sdc_planning[0,:, :6, :2], sdc_planning_mask[0,:, :6, :2], fut_boxes)
planning_results = planning_metrics.compute()
planning_metrics.reset()
from prettytable import PrettyTable
planning_tab = PrettyTable()
metric_dict = {}
planning_tab.field_names = [
"metrics", "0.5s", "1.0s", "1.5s", "2.0s", "2.5s", "3.0s", "avg"]
for key in planning_results.keys():
value = planning_results[key].tolist()
new_values = []
for i in range(len(value)):
new_values.append(np.array(value[:i+1]).mean())
value = new_values
avg = [value[1], value[3], value[5]]
avg = sum(avg) / len(avg)
value.append(avg)
metric_dict[key] = avg
row_value = []
row_value.append(key)
for i in range(len(value)):
if 'col' in key:
row_value.append('%.3f' % float(value[i]*100) + '%')
else:
row_value.append('%.4f' % float(value[i]))
planning_tab.add_row(row_value)
print_log('\n'+str(planning_tab), logger=logger)
return metric_dict
from shapely.geometry import LineString, box, Polygon
from shapely import ops, strtree
import numpy as np
from nuscenes.map_expansion.map_api import NuScenesMap, NuScenesMapExplorer
from nuscenes.eval.common.utils import quaternion_yaw
from pyquaternion import Quaternion
from .utils import split_collections, get_drivable_area_contour, \
get_ped_crossing_contour
from numpy.typing import NDArray
from typing import Dict, List, Tuple, Union
class NuscMapExtractor(object):
"""NuScenes map ground-truth extractor.
Args:
data_root (str): path to nuScenes dataset
roi_size (tuple or list): bev range
"""
def __init__(self, data_root: str, roi_size: Union[List, Tuple]) -> None:
self.roi_size = roi_size
self.MAPS = ['boston-seaport', 'singapore-hollandvillage',
'singapore-onenorth', 'singapore-queenstown']
self.nusc_maps = {}
self.map_explorer = {}
for loc in self.MAPS:
self.nusc_maps[loc] = NuScenesMap(
dataroot=data_root, map_name=loc)
self.map_explorer[loc] = NuScenesMapExplorer(self.nusc_maps[loc])
# local patch in nuScenes format
self.local_patch = box(-roi_size[0] / 2, -roi_size[1] / 2,
roi_size[0] / 2, roi_size[1] / 2)
def _union_ped(self, ped_geoms: List[Polygon]) -> List[Polygon]:
''' merge close ped crossings.
Args:
ped_geoms (list): list of Polygon
Returns:
union_ped_geoms (Dict): merged ped crossings
'''
def get_rec_direction(geom):
rect = geom.minimum_rotated_rectangle
rect_v_p = np.array(rect.exterior.coords)[:3]
rect_v = rect_v_p[1:]-rect_v_p[:-1]
v_len = np.linalg.norm(rect_v, axis=-1)
longest_v_i = v_len.argmax()
return rect_v[longest_v_i], v_len[longest_v_i]
tree = strtree.STRtree(ped_geoms)
index_by_id = dict((id(pt), i) for i, pt in enumerate(ped_geoms))
final_pgeom = []
remain_idx = [i for i in range(len(ped_geoms))]
for i, pgeom in enumerate(ped_geoms):
if i not in remain_idx:
continue
# update
remain_idx.pop(remain_idx.index(i))
pgeom_v, pgeom_v_norm = get_rec_direction(pgeom)
final_pgeom.append(pgeom)
for o in tree.query(pgeom):
o_idx = index_by_id[id(o)]
if o_idx not in remain_idx:
continue
o_v, o_v_norm = get_rec_direction(o)
cos = pgeom_v.dot(o_v)/(pgeom_v_norm*o_v_norm)
if 1 - np.abs(cos) < 0.01: # theta < 8 degrees.
final_pgeom[-1] =\
final_pgeom[-1].union(o)
# update
remain_idx.pop(remain_idx.index(o_idx))
results = []
for p in final_pgeom:
results.extend(split_collections(p))
return results
def get_map_geom(self,
location: str,
translation: Union[List, NDArray],
rotation: Union[List, NDArray]) -> Dict[str, List[Union[LineString, Polygon]]]:
''' Extract geometries given `location` and self pose, self may be lidar or ego.
Args:
location (str): city name
translation (array): self2global translation, shape (3,)
rotation (array): self2global quaternion, shape (4, )
Returns:
geometries (Dict): extracted geometries by category.
'''
# (center_x, center_y, len_y, len_x) in nuscenes format
patch_box = (translation[0], translation[1],
self.roi_size[1], self.roi_size[0])
rotation = Quaternion(rotation)
yaw = quaternion_yaw(rotation) / np.pi * 180
# get dividers
lane_dividers = self.map_explorer[location]._get_layer_line(
patch_box, yaw, 'lane_divider')
road_dividers = self.map_explorer[location]._get_layer_line(
patch_box, yaw, 'road_divider')
all_dividers = []
for line in lane_dividers + road_dividers:
all_dividers += split_collections(line)
# get ped crossings
ped_crossings = []
ped = self.map_explorer[location]._get_layer_polygon(
patch_box, yaw, 'ped_crossing')
for p in ped:
ped_crossings += split_collections(p)
# some ped crossings are split into several small parts
# we need to merge them
ped_crossings = self._union_ped(ped_crossings)
ped_crossing_lines = []
for p in ped_crossings:
# extract exteriors to get a closed polyline
line = get_ped_crossing_contour(p, self.local_patch)
if line is not None:
ped_crossing_lines.append(line)
# get boundaries
# we take the union of road segments and lanes as drivable areas
# we don't take drivable area layer in nuScenes since its definition may be ambiguous
road_segments = self.map_explorer[location]._get_layer_polygon(
patch_box, yaw, 'road_segment')
lanes = self.map_explorer[location]._get_layer_polygon(
patch_box, yaw, 'lane')
union_roads = ops.unary_union(road_segments)
union_lanes = ops.unary_union(lanes)
drivable_areas = ops.unary_union([union_roads, union_lanes])
drivable_areas = split_collections(drivable_areas)
# boundaries are defined as the contour of drivable areas
boundaries = get_drivable_area_contour(drivable_areas, self.roi_size)
return dict(
divider=all_dividers, # List[LineString]
ped_crossing=ped_crossing_lines, # List[LineString]
boundary=boundaries, # List[LineString]
drivable_area=drivable_areas, # List[Polygon],
)
from shapely.geometry import LineString, box, Polygon, LinearRing
from shapely.geometry.base import BaseGeometry
from shapely import ops
import numpy as np
from scipy.spatial import distance
from typing import List, Optional, Tuple
from numpy.typing import NDArray
def split_collections(geom: BaseGeometry) -> List[Optional[BaseGeometry]]:
''' Split Multi-geoms to list and check is valid or is empty.
Args:
geom (BaseGeometry): geoms to be split or validate.
Returns:
geometries (List): list of geometries.
'''
assert geom.geom_type in ['MultiLineString', 'LineString', 'MultiPolygon',
'Polygon', 'GeometryCollection'], f"got geom type {geom.geom_type}"
if 'Multi' in geom.geom_type:
outs = []
for g in geom.geoms:
if g.is_valid and not g.is_empty:
outs.append(g)
return outs
else:
if geom.is_valid and not geom.is_empty:
return [geom,]
else:
return []
def get_drivable_area_contour(drivable_areas: List[Polygon],
roi_size: Tuple) -> List[LineString]:
''' Extract drivable area contours to get list of boundaries.
Args:
drivable_areas (list): list of drivable areas.
roi_size (tuple): bev range size
Returns:
boundaries (List): list of boundaries.
'''
max_x = roi_size[0] / 2
max_y = roi_size[1] / 2
# a bit smaller than roi to avoid unexpected boundaries on edges
local_patch = box(-max_x + 0.2, -max_y + 0.2, max_x - 0.2, max_y - 0.2)
exteriors = []
interiors = []
for poly in drivable_areas:
exteriors.append(poly.exterior)
for inter in poly.interiors:
interiors.append(inter)
results = []
for ext in exteriors:
# NOTE: we make sure all exteriors are clock-wise
# such that each boundary's right-hand-side is drivable area
# and left-hand-side is walk way
if ext.is_ccw:
ext = LinearRing(list(ext.coords)[::-1])
lines = ext.intersection(local_patch)
if lines.geom_type == 'MultiLineString':
lines = ops.linemerge(lines)
assert lines.geom_type in ['MultiLineString', 'LineString']
results.extend(split_collections(lines))
for inter in interiors:
# NOTE: we make sure all interiors are counter-clock-wise
if not inter.is_ccw:
inter = LinearRing(list(inter.coords)[::-1])
lines = inter.intersection(local_patch)
if lines.geom_type == 'MultiLineString':
lines = ops.linemerge(lines)
assert lines.geom_type in ['MultiLineString', 'LineString']
results.extend(split_collections(lines))
return results
def get_ped_crossing_contour(polygon: Polygon,
local_patch: box) -> Optional[LineString]:
''' Extract ped crossing contours to get a closed polyline.
Different from `get_drivable_area_contour`, this function ensures a closed polyline.
Args:
polygon (Polygon): ped crossing polygon to be extracted.
local_patch (tuple): local patch params
Returns:
line (LineString): a closed line
'''
ext = polygon.exterior
if not ext.is_ccw:
ext = LinearRing(list(ext.coords)[::-1])
lines = ext.intersection(local_patch)
if lines.type != 'LineString':
# remove points in intersection results
lines = [l for l in lines.geoms if l.geom_type != 'Point']
lines = ops.linemerge(lines)
# same instance but not connected.
if lines.type != 'LineString':
ls = []
for l in lines.geoms:
ls.append(np.array(l.coords))
lines = np.concatenate(ls, axis=0)
lines = LineString(lines)
if not lines.is_empty:
return lines
return None
This diff is collapsed.
from .transform import (
InstanceNameFilter,
CircleObjectRangeFilter,
NormalizeMultiviewImage,
NuScenesSparse4DAdaptor,
MultiScaleDepthMapGenerator,
)
from .augment import (
ResizeCropFlipImage,
BBoxRotation,
PhotoMetricDistortionMultiViewImage,
)
from .loading import LoadMultiViewImageFromFiles, LoadPointsFromFile
from .vectorize import VectorizeMap
__all__ = [
"InstanceNameFilter",
"ResizeCropFlipImage",
"BBoxRotation",
"CircleObjectRangeFilter",
"MultiScaleDepthMapGenerator",
"NormalizeMultiviewImage",
"PhotoMetricDistortionMultiViewImage",
"NuScenesSparse4DAdaptor",
"LoadMultiViewImageFromFiles",
"LoadPointsFromFile",
"VectorizeMap",
]
import torch
import numpy as np
from numpy import random
import mmcv
from mmdet.datasets.builder import PIPELINES
from PIL import Image
@PIPELINES.register_module()
class ResizeCropFlipImage(object):
def __call__(self, results):
aug_config = results.get("aug_config")
if aug_config is None:
return results
imgs = results["img"]
N = len(imgs)
new_imgs = []
for i in range(N):
img, mat = self._img_transform(
np.uint8(imgs[i]), aug_config,
)
new_imgs.append(np.array(img).astype(np.float32))
results["lidar2img"][i] = mat @ results["lidar2img"][i]
if "cam_intrinsic" in results:
results["cam_intrinsic"][i][:3, :3] *= aug_config["resize"]
# results["cam_intrinsic"][i][:3, :3] = (
# mat[:3, :3] @ results["cam_intrinsic"][i][:3, :3]
# )
results["img"] = new_imgs
results["img_shape"] = [x.shape[:2] for x in new_imgs]
return results
def _img_transform(self, img, aug_configs):
H, W = img.shape[:2]
resize = aug_configs.get("resize", 1)
resize_dims = (int(W * resize), int(H * resize))
crop = aug_configs.get("crop", [0, 0, *resize_dims])
flip = aug_configs.get("flip", False)
rotate = aug_configs.get("rotate", 0)
origin_dtype = img.dtype
if origin_dtype != np.uint8:
min_value = img.min()
max_vaule = img.max()
scale = 255 / (max_vaule - min_value)
img = (img - min_value) * scale
img = np.uint8(img)
img = Image.fromarray(img)
img = img.resize(resize_dims).crop(crop)
if flip:
img = img.transpose(method=Image.FLIP_LEFT_RIGHT)
img = img.rotate(rotate)
img = np.array(img).astype(np.float32)
if origin_dtype != np.uint8:
img = img.astype(np.float32)
img = img / scale + min_value
transform_matrix = np.eye(3)
transform_matrix[:2, :2] *= resize
transform_matrix[:2, 2] -= np.array(crop[:2])
if flip:
flip_matrix = np.array(
[[-1, 0, crop[2] - crop[0]], [0, 1, 0], [0, 0, 1]]
)
transform_matrix = flip_matrix @ transform_matrix
rotate = rotate / 180 * np.pi
rot_matrix = np.array(
[
[np.cos(rotate), np.sin(rotate), 0],
[-np.sin(rotate), np.cos(rotate), 0],
[0, 0, 1],
]
)
rot_center = np.array([crop[2] - crop[0], crop[3] - crop[1]]) / 2
rot_matrix[:2, 2] = -rot_matrix[:2, :2] @ rot_center + rot_center
transform_matrix = rot_matrix @ transform_matrix
extend_matrix = np.eye(4)
extend_matrix[:3, :3] = transform_matrix
return img, extend_matrix
@PIPELINES.register_module()
class BBoxRotation(object):
def __call__(self, results):
angle = results["aug_config"]["rotate_3d"]
rot_cos = np.cos(angle)
rot_sin = np.sin(angle)
rot_mat = np.array(
[
[rot_cos, -rot_sin, 0, 0],
[rot_sin, rot_cos, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1],
]
)
rot_mat_inv = np.linalg.inv(rot_mat)
num_view = len(results["lidar2img"])
for view in range(num_view):
results["lidar2img"][view] = (
results["lidar2img"][view] @ rot_mat_inv
)
if "lidar2global" in results:
results["lidar2global"] = results["lidar2global"] @ rot_mat_inv
if "gt_bboxes_3d" in results:
results["gt_bboxes_3d"] = self.box_rotate(
results["gt_bboxes_3d"], angle
)
return results
@staticmethod
def box_rotate(bbox_3d, angle):
rot_cos = np.cos(angle)
rot_sin = np.sin(angle)
rot_mat_T = np.array(
[[rot_cos, rot_sin, 0], [-rot_sin, rot_cos, 0], [0, 0, 1]]
)
bbox_3d[:, :3] = bbox_3d[:, :3] @ rot_mat_T
bbox_3d[:, 6] += angle
if bbox_3d.shape[-1] > 7:
vel_dims = bbox_3d[:, 7:].shape[-1]
bbox_3d[:, 7:] = bbox_3d[:, 7:] @ rot_mat_T[:vel_dims, :vel_dims]
return bbox_3d
@PIPELINES.register_module()
class PhotoMetricDistortionMultiViewImage:
"""Apply photometric distortion to image sequentially, every transformation
is applied with a probability of 0.5. The position of random contrast is in
second or second to last.
1. random brightness
2. random contrast (mode 0)
3. convert color from BGR to HSV
4. random saturation
5. random hue
6. convert color from HSV to BGR
7. random contrast (mode 1)
8. randomly swap channels
Args:
brightness_delta (int): delta of brightness.
contrast_range (tuple): range of contrast.
saturation_range (tuple): range of saturation.
hue_delta (int): delta of hue.
"""
def __init__(
self,
brightness_delta=32,
contrast_range=(0.5, 1.5),
saturation_range=(0.5, 1.5),
hue_delta=18,
):
self.brightness_delta = brightness_delta
self.contrast_lower, self.contrast_upper = contrast_range
self.saturation_lower, self.saturation_upper = saturation_range
self.hue_delta = hue_delta
def __call__(self, results):
"""Call function to perform photometric distortion on images.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Result dict with images distorted.
"""
imgs = results["img"]
new_imgs = []
for img in imgs:
assert img.dtype == np.float32, (
"PhotoMetricDistortion needs the input image of dtype np.float32,"
' please set "to_float32=True" in "LoadImageFromFile" pipeline'
)
# random brightness
if random.randint(2):
delta = random.uniform(
-self.brightness_delta, self.brightness_delta
)
img += delta
# mode == 0 --> do random contrast first
# mode == 1 --> do random contrast last
mode = random.randint(2)
if mode == 1:
if random.randint(2):
alpha = random.uniform(
self.contrast_lower, self.contrast_upper
)
img *= alpha
# convert color from BGR to HSV
img = mmcv.bgr2hsv(img)
# random saturation
if random.randint(2):
img[..., 1] *= random.uniform(
self.saturation_lower, self.saturation_upper
)
# random hue
if random.randint(2):
img[..., 0] += random.uniform(-self.hue_delta, self.hue_delta)
img[..., 0][img[..., 0] > 360] -= 360
img[..., 0][img[..., 0] < 0] += 360
# convert color from HSV to BGR
img = mmcv.hsv2bgr(img)
# random contrast
if mode == 0:
if random.randint(2):
alpha = random.uniform(
self.contrast_lower, self.contrast_upper
)
img *= alpha
# randomly swap channels
if random.randint(2):
img = img[..., random.permutation(3)]
new_imgs.append(img)
results["img"] = new_imgs
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f"(\nbrightness_delta={self.brightness_delta},\n"
repr_str += "contrast_range="
repr_str += f"{(self.contrast_lower, self.contrast_upper)},\n"
repr_str += "saturation_range="
repr_str += f"{(self.saturation_lower, self.saturation_upper)},\n"
repr_str += f"hue_delta={self.hue_delta})"
return repr_str
import numpy as np
import mmcv
from mmdet.datasets.builder import PIPELINES
@PIPELINES.register_module()
class LoadMultiViewImageFromFiles(object):
"""Load multi channel images from a list of separate channel files.
Expects results['img_filename'] to be a list of filenames.
Args:
to_float32 (bool, optional): Whether to convert the img to float32.
Defaults to False.
color_type (str, optional): Color type of the file.
Defaults to 'unchanged'.
"""
def __init__(self, to_float32=False, color_type="unchanged"):
self.to_float32 = to_float32
self.color_type = color_type
def __call__(self, results):
"""Call function to load multi-view image from files.
Args:
results (dict): Result dict containing multi-view image filenames.
Returns:
dict: The result dict containing the multi-view image data.
Added keys and values are described below.
- filename (str): Multi-view image filenames.
- img (np.ndarray): Multi-view image arrays.
- img_shape (tuple[int]): Shape of multi-view image arrays.
- ori_shape (tuple[int]): Shape of original image arrays.
- pad_shape (tuple[int]): Shape of padded image arrays.
- scale_factor (float): Scale factor.
- img_norm_cfg (dict): Normalization configuration of images.
"""
filename = results["img_filename"]
# img is of shape (h, w, c, num_views)
img = np.stack(
[mmcv.imread(name, self.color_type) for name in filename], axis=-1
)
if self.to_float32:
img = img.astype(np.float32)
results["filename"] = filename
# unravel to list, see `DefaultFormatBundle` in formatting.py
# which will transpose each image separately and then stack into array
results["img"] = [img[..., i] for i in range(img.shape[-1])]
results["img_shape"] = img.shape
results["ori_shape"] = img.shape
# Set initial values for default meta_keys
results["pad_shape"] = img.shape
results["scale_factor"] = 1.0
num_channels = 1 if len(img.shape) < 3 else img.shape[2]
results["img_norm_cfg"] = dict(
mean=np.zeros(num_channels, dtype=np.float32),
std=np.ones(num_channels, dtype=np.float32),
to_rgb=False,
)
return results
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f"(to_float32={self.to_float32}, "
repr_str += f"color_type='{self.color_type}')"
return repr_str
@PIPELINES.register_module()
class LoadPointsFromFile(object):
"""Load Points From File.
Load points from file.
Args:
coord_type (str): The type of coordinates of points cloud.
Available options includes:
- 'LIDAR': Points in LiDAR coordinates.
- 'DEPTH': Points in depth coordinates, usually for indoor dataset.
- 'CAMERA': Points in camera coordinates.
load_dim (int, optional): The dimension of the loaded points.
Defaults to 6.
use_dim (list[int], optional): Which dimensions of the points to use.
Defaults to [0, 1, 2]. For KITTI dataset, set use_dim=4
or use_dim=[0, 1, 2, 3] to use the intensity dimension.
shift_height (bool, optional): Whether to use shifted height.
Defaults to False.
use_color (bool, optional): Whether to use color features.
Defaults to False.
file_client_args (dict, optional): Config dict of file clients,
refer to
https://github.com/open-mmlab/mmcv/blob/master/mmcv/fileio/file_client.py
for more details. Defaults to dict(backend='disk').
"""
def __init__(
self,
coord_type,
load_dim=6,
use_dim=[0, 1, 2],
shift_height=False,
use_color=False,
file_client_args=dict(backend="disk"),
):
self.shift_height = shift_height
self.use_color = use_color
if isinstance(use_dim, int):
use_dim = list(range(use_dim))
assert (
max(use_dim) < load_dim
), f"Expect all used dimensions < {load_dim}, got {use_dim}"
assert coord_type in ["CAMERA", "LIDAR", "DEPTH"]
self.coord_type = coord_type
self.load_dim = load_dim
self.use_dim = use_dim
self.file_client_args = file_client_args.copy()
self.file_client = None
def _load_points(self, pts_filename):
"""Private function to load point clouds data.
Args:
pts_filename (str): Filename of point clouds data.
Returns:
np.ndarray: An array containing point clouds data.
"""
if self.file_client is None:
self.file_client = mmcv.FileClient(**self.file_client_args)
try:
pts_bytes = self.file_client.get(pts_filename)
points = np.frombuffer(pts_bytes, dtype=np.float32)
except ConnectionError:
mmcv.check_file_exist(pts_filename)
if pts_filename.endswith(".npy"):
points = np.load(pts_filename)
else:
points = np.fromfile(pts_filename, dtype=np.float32)
return points
def __call__(self, results):
"""Call function to load points data from file.
Args:
results (dict): Result dict containing point clouds data.
Returns:
dict: The result dict containing the point clouds data.
Added key and value are described below.
- points (:obj:`BasePoints`): Point clouds data.
"""
pts_filename = results["pts_filename"]
points = self._load_points(pts_filename)
points = points.reshape(-1, self.load_dim)
points = points[:, self.use_dim]
attribute_dims = None
if self.shift_height:
floor_height = np.percentile(points[:, 2], 0.99)
height = points[:, 2] - floor_height
points = np.concatenate(
[points[:, :3], np.expand_dims(height, 1), points[:, 3:]], 1
)
attribute_dims = dict(height=3)
if self.use_color:
assert len(self.use_dim) >= 6
if attribute_dims is None:
attribute_dims = dict()
attribute_dims.update(
dict(
color=[
points.shape[1] - 3,
points.shape[1] - 2,
points.shape[1] - 1,
]
)
)
results["points"] = points
return results
This diff is collapsed.
This diff is collapsed.
from .group_sampler import DistributedGroupSampler
from .distributed_sampler import DistributedSampler
from .sampler import SAMPLER, build_sampler
from .group_in_batch_sampler import (
GroupInBatchSampler,
)
This diff is collapsed.
from mmcv.utils.registry import Registry, build_from_cfg
SAMPLER = Registry("sampler")
def build_sampler(cfg, default_args):
return build_from_cfg(cfg, SAMPLER, default_args)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment