Unverified Commit b64d9ca3 authored by Wenhai Wang's avatar Wenhai Wang Committed by GitHub
Browse files

Merge pull request #105 from zhiqi-li/occupancy

support occupancy prediction
parents bdd98bcb df3c64a9
import torch
from mmdet.core.bbox.builder import BBOX_ASSIGNERS
from mmdet.core.bbox.assigners import AssignResult
from mmdet.core.bbox.assigners import BaseAssigner
from mmdet.core.bbox.match_costs import build_match_cost
from mmdet.models.utils.transformer import inverse_sigmoid
from projects.mmdet3d_plugin.core.bbox.util import normalize_bbox
try:
from scipy.optimize import linear_sum_assignment
except ImportError:
linear_sum_assignment = None
@BBOX_ASSIGNERS.register_module()
class HungarianAssigner3D(BaseAssigner):
"""Computes one-to-one matching between predictions and ground truth.
This class computes an assignment between the targets and the predictions
based on the costs. The costs are weighted sum of three components:
classification cost, regression L1 cost and regression iou cost. The
targets don't include the no_object, so generally there are more
predictions than targets. After the one-to-one matching, the un-matched
are treated as backgrounds. Thus each query prediction will be assigned
with `0` or a positive integer indicating the ground truth index:
- 0: negative sample, no assigned gt
- positive integer: positive sample, index (1-based) of assigned gt
Args:
cls_weight (int | float, optional): The scale factor for classification
cost. Default 1.0.
bbox_weight (int | float, optional): The scale factor for regression
L1 cost. Default 1.0.
iou_weight (int | float, optional): The scale factor for regression
iou cost. Default 1.0.
iou_calculator (dict | optional): The config for the iou calculation.
Default type `BboxOverlaps2D`.
iou_mode (str | optional): "iou" (intersection over union), "iof"
(intersection over foreground), or "giou" (generalized
intersection over union). Default "giou".
"""
def __init__(self,
cls_cost=dict(type='ClassificationCost', weight=1.),
reg_cost=dict(type='BBoxL1Cost', weight=1.0),
iou_cost=dict(type='IoUCost', weight=0.0),
pc_range=None):
self.cls_cost = build_match_cost(cls_cost)
self.reg_cost = build_match_cost(reg_cost)
self.iou_cost = build_match_cost(iou_cost)
self.pc_range = pc_range
def assign(self,
bbox_pred,
cls_pred,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
eps=1e-7):
"""Computes one-to-one matching based on the weighted costs.
This method assign each query prediction to a ground truth or
background. The `assigned_gt_inds` with -1 means don't care,
0 means negative sample, and positive number is the index (1-based)
of assigned gt.
The assignment is done in the following steps, the order matters.
1. assign every prediction to -1
2. compute the weighted costs
3. do Hungarian matching on CPU based on the costs
4. assign all to 0 (background) first, then for each matched pair
between predictions and gts, treat this prediction as foreground
and assign the corresponding gt index (plus 1) to it.
Args:
bbox_pred (Tensor): Predicted boxes with normalized coordinates
(cx, cy, w, h), which are all in range [0, 1]. Shape
[num_query, 4].
cls_pred (Tensor): Predicted classification logits, shape
[num_query, num_class].
gt_bboxes (Tensor): Ground truth boxes with unnormalized
coordinates (x1, y1, x2, y2). Shape [num_gt, 4].
gt_labels (Tensor): Label of `gt_bboxes`, shape (num_gt,).
gt_bboxes_ignore (Tensor, optional): Ground truth bboxes that are
labelled as `ignored`. Default None.
eps (int | float, optional): A value added to the denominator for
numerical stability. Default 1e-7.
Returns:
:obj:`AssignResult`: The assigned result.
"""
assert gt_bboxes_ignore is None, \
'Only case when gt_bboxes_ignore is None is supported.'
num_gts, num_bboxes = gt_bboxes.size(0), bbox_pred.size(0)
# 1. assign -1 by default
assigned_gt_inds = bbox_pred.new_full((num_bboxes, ),
-1,
dtype=torch.long)
assigned_labels = bbox_pred.new_full((num_bboxes, ),
-1,
dtype=torch.long)
if num_gts == 0 or num_bboxes == 0:
# No ground truth or boxes, return empty assignment
if num_gts == 0:
# No ground truth, assign all to background
assigned_gt_inds[:] = 0
return AssignResult(
num_gts, assigned_gt_inds, None, labels=assigned_labels)
# 2. compute the weighted costs
# classification and bboxcost.
cls_cost = self.cls_cost(cls_pred, gt_labels)
# regression L1 cost
normalized_gt_bboxes = normalize_bbox(gt_bboxes, self.pc_range)
reg_cost = self.reg_cost(bbox_pred[:, :8], normalized_gt_bboxes[:, :8])
# weighted sum of above two costs
cost = cls_cost + reg_cost
# 3. do Hungarian matching on CPU using linear_sum_assignment
cost = cost.detach().cpu()
if linear_sum_assignment is None:
raise ImportError('Please run "pip install scipy" '
'to install scipy first.')
matched_row_inds, matched_col_inds = linear_sum_assignment(cost)
matched_row_inds = torch.from_numpy(matched_row_inds).to(
bbox_pred.device)
matched_col_inds = torch.from_numpy(matched_col_inds).to(
bbox_pred.device)
# 4. assign backgrounds and foregrounds
# assign all indices to backgrounds first
assigned_gt_inds[:] = 0
# assign foregrounds based on matching results
assigned_gt_inds[matched_row_inds] = matched_col_inds + 1
assigned_labels[matched_row_inds] = gt_labels[matched_col_inds]
return AssignResult(
num_gts, assigned_gt_inds, None, labels=assigned_labels)
\ No newline at end of file
from .nms_free_coder import NMSFreeCoder
__all__ = ['NMSFreeCoder']
import torch
from mmdet.core.bbox import BaseBBoxCoder
from mmdet.core.bbox.builder import BBOX_CODERS
from projects.mmdet3d_plugin.core.bbox.util import denormalize_bbox
import numpy as np
@BBOX_CODERS.register_module()
class NMSFreeCoder(BaseBBoxCoder):
"""Bbox coder for NMS-free detector.
Args:
pc_range (list[float]): Range of point cloud.
post_center_range (list[float]): Limit of the center.
Default: None.
max_num (int): Max number to be kept. Default: 100.
score_threshold (float): Threshold to filter boxes based on score.
Default: None.
code_size (int): Code size of bboxes. Default: 9
"""
def __init__(self,
pc_range,
voxel_size=None,
post_center_range=None,
max_num=100,
score_threshold=None,
num_classes=10):
self.pc_range = pc_range
self.voxel_size = voxel_size
self.post_center_range = post_center_range
self.max_num = max_num
self.score_threshold = score_threshold
self.num_classes = num_classes
def encode(self):
pass
def decode_single(self, cls_scores, bbox_preds):
"""Decode bboxes.
Args:
cls_scores (Tensor): Outputs from the classification head, \
shape [num_query, cls_out_channels]. Note \
cls_out_channels should includes background.
bbox_preds (Tensor): Outputs from the regression \
head with normalized coordinate format (cx, cy, w, l, cz, h, rot_sine, rot_cosine, vx, vy). \
Shape [num_query, 9].
Returns:
list[dict]: Decoded boxes.
"""
max_num = self.max_num
cls_scores = cls_scores.sigmoid()
scores, indexs = cls_scores.view(-1).topk(max_num)
labels = indexs % self.num_classes
bbox_index = indexs // self.num_classes
bbox_preds = bbox_preds[bbox_index]
final_box_preds = denormalize_bbox(bbox_preds, self.pc_range)
final_scores = scores
final_preds = labels
# use score threshold
if self.score_threshold is not None:
thresh_mask = final_scores > self.score_threshold
tmp_score = self.score_threshold
while thresh_mask.sum() == 0:
tmp_score *= 0.9
if tmp_score < 0.01:
thresh_mask = final_scores > -1
break
thresh_mask = final_scores >= tmp_score
if self.post_center_range is not None:
self.post_center_range = torch.tensor(
self.post_center_range, device=scores.device)
mask = (final_box_preds[..., :3] >=
self.post_center_range[:3]).all(1)
mask &= (final_box_preds[..., :3] <=
self.post_center_range[3:]).all(1)
if self.score_threshold:
mask &= thresh_mask
boxes3d = final_box_preds[mask]
scores = final_scores[mask]
labels = final_preds[mask]
predictions_dict = {
'bboxes': boxes3d,
'scores': scores,
'labels': labels
}
else:
raise NotImplementedError(
'Need to reorganize output as a batch, only '
'support post_center_range is not None for now!')
return predictions_dict
def decode(self, preds_dicts):
"""Decode bboxes.
Args:
all_cls_scores (Tensor): Outputs from the classification head, \
shape [nb_dec, bs, num_query, cls_out_channels]. Note \
cls_out_channels should includes background.
all_bbox_preds (Tensor): Sigmoid outputs from the regression \
head with normalized coordinate format (cx, cy, w, l, cz, h, rot_sine, rot_cosine, vx, vy). \
Shape [nb_dec, bs, num_query, 9].
Returns:
list[dict]: Decoded boxes.
"""
all_cls_scores = preds_dicts['all_cls_scores'][-1]
all_bbox_preds = preds_dicts['all_bbox_preds'][-1]
batch_size = all_cls_scores.size()[0]
predictions_list = []
for i in range(batch_size):
predictions_list.append(self.decode_single(all_cls_scores[i], all_bbox_preds[i]))
return predictions_list
from mmdet.core.bbox.match_costs import build_match_cost
from .match_cost import BBox3DL1Cost
__all__ = ['build_match_cost', 'BBox3DL1Cost']
\ No newline at end of file
import torch
from mmdet.core.bbox.match_costs.builder import MATCH_COST
@MATCH_COST.register_module()
class BBox3DL1Cost(object):
"""BBox3DL1Cost.
Args:
weight (int | float, optional): loss_weight
"""
def __init__(self, weight=1.):
self.weight = weight
def __call__(self, bbox_pred, gt_bboxes):
"""
Args:
bbox_pred (Tensor): Predicted boxes with normalized coordinates
(cx, cy, w, h), which are all in range [0, 1]. Shape
[num_query, 4].
gt_bboxes (Tensor): Ground truth boxes with normalized
coordinates (x1, y1, x2, y2). Shape [num_gt, 4].
Returns:
torch.Tensor: bbox_cost value with weight
"""
bbox_cost = torch.cdist(bbox_pred, gt_bboxes, p=1)
return bbox_cost * self.weight
\ No newline at end of file
import torch
def normalize_bbox(bboxes, pc_range):
cx = bboxes[..., 0:1]
cy = bboxes[..., 1:2]
cz = bboxes[..., 2:3]
w = bboxes[..., 3:4].log()
l = bboxes[..., 4:5].log()
h = bboxes[..., 5:6].log()
rot = bboxes[..., 6:7]
if bboxes.size(-1) > 7:
vx = bboxes[..., 7:8]
vy = bboxes[..., 8:9]
normalized_bboxes = torch.cat(
(cx, cy, w, l, cz, h, rot.sin(), rot.cos(), vx, vy), dim=-1
)
else:
normalized_bboxes = torch.cat(
(cx, cy, w, l, cz, h, rot.sin(), rot.cos()), dim=-1
)
return normalized_bboxes
def denormalize_bbox(normalized_bboxes, pc_range):
# rotation
rot_sine = normalized_bboxes[..., 6:7]
rot_cosine = normalized_bboxes[..., 7:8]
rot = torch.atan2(rot_sine, rot_cosine)
# center in the bev
cx = normalized_bboxes[..., 0:1]
cy = normalized_bboxes[..., 1:2]
cz = normalized_bboxes[..., 4:5]
# size
w = normalized_bboxes[..., 2:3]
l = normalized_bboxes[..., 3:4]
h = normalized_bboxes[..., 5:6]
w = w.exp()
l = l.exp()
h = h.exp()
if normalized_bboxes.size(-1) > 8:
# velocity
vx = normalized_bboxes[:, 8:9]
vy = normalized_bboxes[:, 9:10]
denormalized_bboxes = torch.cat([cx, cy, cz, w, l, h, rot, vx, vy], dim=-1)
else:
denormalized_bboxes = torch.cat([cx, cy, cz, w, l, h, rot], dim=-1)
return denormalized_bboxes
\ No newline at end of file
from .eval_hooks import CustomDistEvalHook
\ No newline at end of file
# Note: Considering that MMCV's EvalHook updated its interface in V1.3.16,
# in order to avoid strong version dependency, we did not directly
# inherit EvalHook but BaseDistEvalHook.
import bisect
import os.path as osp
import mmcv
import torch.distributed as dist
from mmcv.runner import DistEvalHook as BaseDistEvalHook
from mmcv.runner import EvalHook as BaseEvalHook
from torch.nn.modules.batchnorm import _BatchNorm
from mmdet.core.evaluation.eval_hooks import DistEvalHook
def _calc_dynamic_intervals(start_interval, dynamic_interval_list):
assert mmcv.is_list_of(dynamic_interval_list, tuple)
dynamic_milestones = [0]
dynamic_milestones.extend(
[dynamic_interval[0] for dynamic_interval in dynamic_interval_list])
dynamic_intervals = [start_interval]
dynamic_intervals.extend(
[dynamic_interval[1] for dynamic_interval in dynamic_interval_list])
return dynamic_milestones, dynamic_intervals
class CustomDistEvalHook(BaseDistEvalHook):
def __init__(self, *args, dynamic_intervals=None, **kwargs):
super(CustomDistEvalHook, self).__init__(*args, **kwargs)
self.use_dynamic_intervals = dynamic_intervals is not None
if self.use_dynamic_intervals:
self.dynamic_milestones, self.dynamic_intervals = \
_calc_dynamic_intervals(self.interval, dynamic_intervals)
def _decide_interval(self, runner):
if self.use_dynamic_intervals:
progress = runner.epoch if self.by_epoch else runner.iter
step = bisect.bisect(self.dynamic_milestones, (progress + 1))
# Dynamically modify the evaluation interval
self.interval = self.dynamic_intervals[step - 1]
def before_train_epoch(self, runner):
"""Evaluate the model only at the start of training by epoch."""
self._decide_interval(runner)
super().before_train_epoch(runner)
def before_train_iter(self, runner):
self._decide_interval(runner)
super().before_train_iter(runner)
def _do_evaluate(self, runner):
"""perform evaluation and save ckpt."""
# Synchronization of BatchNorm's buffer (running_mean
# and running_var) is not supported in the DDP of pytorch,
# which may cause the inconsistent performance of models in
# different ranks, so we broadcast BatchNorm's buffers
# of rank 0 to other ranks to avoid this.
if self.broadcast_bn_buffer:
model = runner.model
for name, module in model.named_modules():
if isinstance(module,
_BatchNorm) and module.track_running_stats:
dist.broadcast(module.running_var, 0)
dist.broadcast(module.running_mean, 0)
if not self._should_evaluate(runner):
return
tmpdir = self.tmpdir
if tmpdir is None:
tmpdir = osp.join(runner.work_dir, '.eval_hook')
from projects.mmdet3d_plugin.bevformer.apis.test import custom_multi_gpu_test # to solve circlur import
results = custom_multi_gpu_test(
runner.model,
self.dataloader,
tmpdir=tmpdir,
gpu_collect=self.gpu_collect)
if runner.rank == 0:
print('\n')
runner.log_buffer.output['eval_iter_num'] = len(self.dataloader)
# key_score = self.evaluate(runner, results)
self.dataloader.dataset.evaluate_miou(results,
runner=runner)
# if self.save_best:
# self._save_ckpt(runner, key_score)
from .nuscenes_dataset import CustomNuScenesDataset
from .nuscenes_occ import NuSceneOcc
from .builder import custom_build_dataset
__all__ = [
'CustomNuScenesDataset'
]
# Copyright (c) OpenMMLab. All rights reserved.
import copy
import platform
import random
from functools import partial
import numpy as np
from mmcv.parallel import collate
from mmcv.runner import get_dist_info
from mmcv.utils import Registry, build_from_cfg
from torch.utils.data import DataLoader
from mmdet.datasets.samplers import GroupSampler
from projects.mmdet3d_plugin.datasets.samplers.group_sampler import DistributedGroupSampler
from projects.mmdet3d_plugin.datasets.samplers.distributed_sampler import DistributedSampler
from projects.mmdet3d_plugin.datasets.samplers.sampler import build_sampler
def build_dataloader(dataset,
samples_per_gpu,
workers_per_gpu,
num_gpus=1,
dist=True,
shuffle=True,
seed=None,
shuffler_sampler=None,
nonshuffler_sampler=None,
**kwargs):
"""Build PyTorch DataLoader.
In distributed training, each GPU/process has a dataloader.
In non-distributed training, there is only one dataloader for all GPUs.
Args:
dataset (Dataset): A PyTorch dataset.
samples_per_gpu (int): Number of training samples on each GPU, i.e.,
batch size of each GPU.
workers_per_gpu (int): How many subprocesses to use for data loading
for each GPU.
num_gpus (int): Number of GPUs. Only used in non-distributed training.
dist (bool): Distributed training/test or not. Default: True.
shuffle (bool): Whether to shuffle the data at every epoch.
Default: True.
kwargs: any keyword argument to be used to initialize DataLoader
Returns:
DataLoader: A PyTorch dataloader.
"""
rank, world_size = get_dist_info()
if dist:
# DistributedGroupSampler will definitely shuffle the data to satisfy
# that images on each GPU are in the same group
if shuffle:
sampler = build_sampler(shuffler_sampler if shuffler_sampler is not None else dict(type='DistributedGroupSampler'),
dict(
dataset=dataset,
samples_per_gpu=samples_per_gpu,
num_replicas=world_size,
rank=rank,
seed=seed)
)
else:
sampler = build_sampler(nonshuffler_sampler if nonshuffler_sampler is not None else dict(type='DistributedSampler'),
dict(
dataset=dataset,
num_replicas=world_size,
rank=rank,
shuffle=shuffle,
seed=seed)
)
batch_size = samples_per_gpu
num_workers = workers_per_gpu
else:
# assert False, 'not support in bevformer'
print('WARNING!!!!, Only can be used for obtain inference speed!!!!')
sampler = GroupSampler(dataset, samples_per_gpu) if shuffle else None
batch_size = num_gpus * samples_per_gpu
num_workers = num_gpus * workers_per_gpu
init_fn = partial(
worker_init_fn, num_workers=num_workers, rank=rank,
seed=seed) if seed is not None else None
data_loader = DataLoader(
dataset,
batch_size=batch_size,
sampler=sampler,
num_workers=num_workers,
collate_fn=partial(collate, samples_per_gpu=samples_per_gpu),
pin_memory=False,
worker_init_fn=init_fn,
**kwargs)
return data_loader
def worker_init_fn(worker_id, num_workers, rank, seed):
# The seed of each worker equals to
# num_worker * rank + worker_id + user_seed
worker_seed = num_workers * rank + worker_id + seed
np.random.seed(worker_seed)
random.seed(worker_seed)
# Copyright (c) OpenMMLab. All rights reserved.
import platform
from mmcv.utils import Registry, build_from_cfg
from mmdet.datasets import DATASETS
from mmdet.datasets.builder import _concat_dataset
if platform.system() != 'Windows':
# https://github.com/pytorch/pytorch/issues/973
import resource
rlimit = resource.getrlimit(resource.RLIMIT_NOFILE)
base_soft_limit = rlimit[0]
hard_limit = rlimit[1]
soft_limit = min(max(4096, base_soft_limit), hard_limit)
resource.setrlimit(resource.RLIMIT_NOFILE, (soft_limit, hard_limit))
OBJECTSAMPLERS = Registry('Object sampler')
def custom_build_dataset(cfg, default_args=None):
from mmdet3d.datasets.dataset_wrappers import CBGSDataset
from mmdet.datasets.dataset_wrappers import (ClassBalancedDataset,
ConcatDataset, RepeatDataset)
if isinstance(cfg, (list, tuple)):
dataset = ConcatDataset([custom_build_dataset(c, default_args) for c in cfg])
elif cfg['type'] == 'ConcatDataset':
dataset = ConcatDataset(
[custom_build_dataset(c, default_args) for c in cfg['datasets']],
cfg.get('separate_eval', True))
elif cfg['type'] == 'RepeatDataset':
dataset = RepeatDataset(
custom_build_dataset(cfg['dataset'], default_args), cfg['times'])
elif cfg['type'] == 'ClassBalancedDataset':
dataset = ClassBalancedDataset(
custom_build_dataset(cfg['dataset'], default_args), cfg['oversample_thr'])
elif cfg['type'] == 'CBGSDataset':
dataset = CBGSDataset(custom_build_dataset(cfg['dataset'], default_args))
elif isinstance(cfg.get('ann_file'), (list, tuple)):
dataset = _concat_dataset(cfg, default_args)
else:
dataset = build_from_cfg(cfg, DATASETS, default_args)
return dataset
import copy
import numpy as np
from mmdet.datasets import DATASETS
from mmdet3d.datasets import NuScenesDataset
import mmcv
from os import path as osp
from mmdet.datasets import DATASETS
import torch
import numpy as np
from nuscenes.eval.common.utils import quaternion_yaw, Quaternion
from mmdet3d.core.bbox import Box3DMode, Coord3DMode, LiDARInstance3DBoxes
from .nuscnes_eval import NuScenesEval_custom
from projects.mmdet3d_plugin.models.utils.visual import save_tensor
from mmcv.parallel import DataContainer as DC
import random
@DATASETS.register_module()
class CustomNuScenesDataset(NuScenesDataset):
r"""NuScenes Dataset.
This datset only add camera intrinsics and extrinsics to the results.
"""
def __init__(self, queue_length=4, bev_size=(200, 200), overlap_test=False, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue_length = queue_length
self.overlap_test = overlap_test
self.bev_size = bev_size
def prepare_train_data(self, index):
"""
Training data preparation.
Args:
index (int): Index for accessing the target data.
Returns:
dict: Training data dict of the corresponding index.
"""
queue = []
index_list = list(range(index-self.queue_length, index))
random.shuffle(index_list)
index_list = sorted(index_list[1:])
index_list.append(index)
for i in index_list:
i = max(0, i)
input_dict = self.get_data_info(i)
if input_dict is None:
return None
self.pre_pipeline(input_dict)
example = self.pipeline(input_dict)
if self.filter_empty_gt and \
(example is None or ~(example['gt_labels_3d']._data != -1).any()):
return None
queue.append(example)
return self.union2one(queue)
def union2one(self, queue):
imgs_list = [each['img'].data for each in queue]
metas_map = {}
prev_scene_token = None
prev_pos = None
prev_angle = None
for i, each in enumerate(queue):
metas_map[i] = each['img_metas'].data
if metas_map[i]['scene_token'] != prev_scene_token:
metas_map[i]['prev_bev_exists'] = False
prev_scene_token = metas_map[i]['scene_token']
prev_pos = copy.deepcopy(metas_map[i]['can_bus'][:3])
prev_angle = copy.deepcopy(metas_map[i]['can_bus'][-1])
metas_map[i]['can_bus'][:3] = 0
metas_map[i]['can_bus'][-1] = 0
else:
metas_map[i]['prev_bev_exists'] = True
tmp_pos = copy.deepcopy(metas_map[i]['can_bus'][:3])
tmp_angle = copy.deepcopy(metas_map[i]['can_bus'][-1])
metas_map[i]['can_bus'][:3] -= prev_pos
metas_map[i]['can_bus'][-1] -= prev_angle
prev_pos = copy.deepcopy(tmp_pos)
prev_angle = copy.deepcopy(tmp_angle)
queue[-1]['img'] = DC(torch.stack(imgs_list), cpu_only=False, stack=True)
queue[-1]['img_metas'] = DC(metas_map, cpu_only=True)
queue = queue[-1]
return queue
def get_ann_info(self, index):
"""Get annotation info according to the given index.
Args:
index (int): Index of the annotation data to get.
Returns:
dict: Annotation information consists of the following keys:
- gt_bboxes_3d (:obj:`LiDARInstance3DBoxes`): \
3D ground truth bboxes
- gt_labels_3d (np.ndarray): Labels of ground truths.
- gt_names (list[str]): Class names of ground truths.
"""
info = self.data_infos[index]
# filter out bbox containing no points
if self.use_valid_flag:
mask = info['valid_flag']
else:
mask = info['num_lidar_pts'] > 0
gt_bboxes_3d = info['gt_boxes'][mask]
gt_names_3d = info['gt_names'][mask]
gt_labels_3d = []
for cat in gt_names_3d:
if cat in self.CLASSES:
gt_labels_3d.append(self.CLASSES.index(cat))
else:
gt_labels_3d.append(-1)
gt_labels_3d = np.array(gt_labels_3d)
if self.with_velocity:
gt_velocity = info['gt_velocity'][mask]
nan_mask = np.isnan(gt_velocity[:, 0])
gt_velocity[nan_mask] = [0.0, 0.0]
gt_bboxes_3d = np.concatenate([gt_bboxes_3d, gt_velocity], axis=-1)
# the nuscenes box center is [0.5, 0.5, 0.5], we change it to be
# the same as KITTI (0.5, 0.5, 0)
gt_bboxes_3d = LiDARInstance3DBoxes(
gt_bboxes_3d,
box_dim=gt_bboxes_3d.shape[-1],
origin=(0.5, 0.5, 0.5)).convert_to(self.box_mode_3d)
anns_results = dict(
gt_bboxes_3d=gt_bboxes_3d,
gt_labels_3d=gt_labels_3d,
gt_names=gt_names_3d)
return anns_results
def get_data_info(self, index):
"""Get data info according to the given index.
Args:
index (int): Index of the sample data to get.
Returns:
dict: Data information that will be passed to the data \
preprocessing pipelines. It includes the following keys:
- sample_idx (str): Sample index.
- pts_filename (str): Filename of point clouds.
- sweeps (list[dict]): Infos of sweeps.
- timestamp (float): Sample timestamp.
- img_filename (str, optional): Image filename.
- lidar2img (list[np.ndarray], optional): Transformations \
from lidar to different cameras.
- ann_info (dict): Annotation info.
"""
info = self.data_infos[index]
# standard protocal modified from SECOND.Pytorch
input_dict = dict(
sample_idx=info['token'],
pts_filename=info['lidar_path'],
sweeps=info['sweeps'],
ego2global_translation=info['ego2global_translation'],
ego2global_rotation=info['ego2global_rotation'],
prev_idx=info['prev'],
next_idx=info['next'],
scene_token=info['scene_token'],
can_bus=info['can_bus'],
frame_idx=info['frame_idx'],
timestamp=info['timestamp'] / 1e6,
)
if self.modality['use_camera']:
image_paths = []
lidar2img_rts = []
lidar2cam_rts = []
cam_intrinsics = []
for cam_type, cam_info in info['cams'].items():
image_paths.append(cam_info['data_path'])
# obtain lidar to image transformation matrix
lidar2cam_r = np.linalg.inv(cam_info['sensor2lidar_rotation'])
lidar2cam_t = cam_info[
'sensor2lidar_translation'] @ lidar2cam_r.T
lidar2cam_rt = np.eye(4)
lidar2cam_rt[:3, :3] = lidar2cam_r.T
lidar2cam_rt[3, :3] = -lidar2cam_t
intrinsic = cam_info['cam_intrinsic']
viewpad = np.eye(4)
viewpad[:intrinsic.shape[0], :intrinsic.shape[1]] = intrinsic
lidar2img_rt = (viewpad @ lidar2cam_rt.T)
lidar2img_rts.append(lidar2img_rt)
cam_intrinsics.append(viewpad)
lidar2cam_rts.append(lidar2cam_rt.T)
input_dict.update(
dict(
img_filename=image_paths,
lidar2img=lidar2img_rts,
cam_intrinsic=cam_intrinsics,
lidar2cam=lidar2cam_rts,
))
if not self.test_mode:
annos = self.get_ann_info(index)
input_dict['ann_info'] = annos
rotation = Quaternion(input_dict['ego2global_rotation'])
translation = input_dict['ego2global_translation']
can_bus = input_dict['can_bus']
can_bus[:3] = translation
can_bus[3:7] = rotation
patch_angle = quaternion_yaw(rotation) / np.pi * 180
if patch_angle < 0:
patch_angle += 360
can_bus[-2] = patch_angle / 180 * np.pi
can_bus[-1] = patch_angle
return input_dict
def __getitem__(self, idx):
"""Get item from infos according to the given index.
Returns:
dict: Data dictionary of the corresponding index.
"""
if self.test_mode:
return self.prepare_test_data(idx)
while True:
data = self.prepare_train_data(idx)
if data is None:
idx = self._rand_another(idx)
continue
return data
def _evaluate_single(self,
result_path,
logger=None,
metric='bbox',
result_name='pts_bbox'):
"""Evaluation for a single model in nuScenes protocol.
Args:
result_path (str): Path of the result file.
logger (logging.Logger | str | None): Logger used for printing
related information during evaluation. Default: None.
metric (str): Metric name used for evaluation. Default: 'bbox'.
result_name (str): Result name in the metric prefix.
Default: 'pts_bbox'.
Returns:
dict: Dictionary of evaluation details.
"""
from nuscenes import NuScenes
self.nusc = NuScenes(version=self.version, dataroot=self.data_root,
verbose=True)
output_dir = osp.join(*osp.split(result_path)[:-1])
eval_set_map = {
'v1.0-mini': 'mini_val',
'v1.0-trainval': 'val',
}
self.nusc_eval = NuScenesEval_custom(
self.nusc,
config=self.eval_detection_configs,
result_path=result_path,
eval_set=eval_set_map[self.version],
output_dir=output_dir,
verbose=True,
overlap_test=self.overlap_test,
data_infos=self.data_infos
)
self.nusc_eval.main(plot_examples=0, render_curves=False)
# record metrics
metrics = mmcv.load(osp.join(output_dir, 'metrics_summary.json'))
detail = dict()
metric_prefix = f'{result_name}_NuScenes'
for name in self.CLASSES:
for k, v in metrics['label_aps'][name].items():
val = float('{:.4f}'.format(v))
detail['{}/{}_AP_dist_{}'.format(metric_prefix, name, k)] = val
for k, v in metrics['label_tp_errors'][name].items():
val = float('{:.4f}'.format(v))
detail['{}/{}_{}'.format(metric_prefix, name, k)] = val
for k, v in metrics['tp_errors'].items():
val = float('{:.4f}'.format(v))
detail['{}/{}'.format(metric_prefix,
self.ErrNameMapping[k])] = val
detail['{}/NDS'.format(metric_prefix)] = metrics['nd_score']
detail['{}/mAP'.format(metric_prefix)] = metrics['mean_ap']
return detail
import copy
import os
import numpy as np
from tqdm import tqdm
from mmdet.datasets import DATASETS
from mmdet3d.datasets import NuScenesDataset
import mmcv
from os import path as osp
from mmdet.datasets import DATASETS
import torch
import numpy as np
from nuscenes.eval.common.utils import quaternion_yaw, Quaternion
from .nuscnes_eval import NuScenesEval_custom
from projects.mmdet3d_plugin.models.utils.visual import save_tensor
from mmcv.parallel import DataContainer as DC
import random
from nuscenes.utils.geometry_utils import transform_matrix
from .occ_metrics import Metric_mIoU, Metric_FScore
@DATASETS.register_module()
class NuSceneOcc(NuScenesDataset):
r"""NuScenes Dataset.
This datset only add camera intrinsics and extrinsics to the results.
"""
def __init__(self, queue_length=4, bev_size=(200, 200), overlap_test=False, eval_fscore=False, *args, **kwargs):
super().__init__(*args, **kwargs)
self.eval_fscore = eval_fscore
self.queue_length = queue_length
self.overlap_test = overlap_test
self.bev_size = bev_size
self.data_infos = self.load_annotations(self.ann_file)
def load_annotations(self, ann_file):
"""Load annotations from ann_file.
Args:
ann_file (str): Path of the annotation file.
Returns:
list[dict]: List of annotations sorted by timestamps.
"""
data = mmcv.load(ann_file)
# self.train_split=data['train_split']
# self.val_split=data['val_split']
data_infos = list(sorted(data['infos'], key=lambda e: e['timestamp']))
data_infos = data_infos[::self.load_interval]
self.metadata = data['metadata']
self.version = self.metadata['version']
return data_infos
def prepare_train_data(self, index):
"""
Training data preparation.
Args:
index (int): Index for accessing the target data.
Returns:
dict: Training data dict of the corresponding index.
"""
queue = []
index_list = list(range(index - self.queue_length, index))
random.shuffle(index_list)
index_list = sorted(index_list[1:])
index_list.append(index)
for i in index_list:
i = max(0, i)
input_dict = self.get_data_info(i)
if input_dict is None:
return None
self.pre_pipeline(input_dict)
example = self.pipeline(input_dict)
queue.append(example)
return self.union2one(queue)
def union2one(self, queue):
imgs_list = [each['img'].data for each in queue]
metas_map = {}
prev_scene_token = None
prev_pos = None
prev_angle = None
for i, each in enumerate(queue):
metas_map[i] = each['img_metas'].data
if metas_map[i]['scene_token'] != prev_scene_token:
metas_map[i]['prev_bev_exists'] = False
prev_scene_token = metas_map[i]['scene_token']
prev_pos = copy.deepcopy(metas_map[i]['can_bus'][:3])
prev_angle = copy.deepcopy(metas_map[i]['can_bus'][-1])
metas_map[i]['can_bus'][:3] = 0
metas_map[i]['can_bus'][-1] = 0
else:
metas_map[i]['prev_bev_exists'] = True
tmp_pos = copy.deepcopy(metas_map[i]['can_bus'][:3])
tmp_angle = copy.deepcopy(metas_map[i]['can_bus'][-1])
metas_map[i]['can_bus'][:3] -= prev_pos
metas_map[i]['can_bus'][-1] -= prev_angle
prev_pos = copy.deepcopy(tmp_pos)
prev_angle = copy.deepcopy(tmp_angle)
queue[-1]['img'] = DC(torch.stack(imgs_list), cpu_only=False, stack=True)
queue[-1]['img_metas'] = DC(metas_map, cpu_only=True)
queue = queue[-1]
return queue
def get_data_info(self, index):
"""Get data info according to the given index.
Args:
index (int): Index of the sample data to get.
Returns:
dict: Data information that will be passed to the data \
preprocessing pipelines. It includes the following keys:
- sample_idx (str): Sample index.
- pts_filename (str): Filename of point clouds.
- sweeps (list[dict]): Infos of sweeps.
- timestamp (float): Sample timestamp.
- img_filename (str, optional): Image filename.
- lidar2img (list[np.ndarray], optional): Transformations \
from lidar to different cameras.
- ann_info (dict): Annotation info.
"""
info = self.data_infos[index]
# standard protocal modified from SECOND.Pytorch
input_dict = dict(
occ_gt_path=info['occ_gt_path'],
sample_idx=info['token'],
pts_filename=info['lidar_path'],
sweeps=info['sweeps'],
ego2global_translation=info['ego2global_translation'],
ego2global_rotation=info['ego2global_rotation'],
prev_idx=info['prev'],
next_idx=info['next'],
scene_token=info['scene_token'],
can_bus=info['can_bus'],
frame_idx=info['frame_idx'],
timestamp=info['timestamp'] / 1e6,
)
lidar2ego_rotation = info['lidar2ego_rotation']
lidar2ego_translation = info['lidar2ego_translation']
ego2lidar = transform_matrix(translation=lidar2ego_translation, rotation=Quaternion(lidar2ego_rotation),
inverse=True)
input_dict['ego2lidar'] = ego2lidar
if self.modality['use_camera']:
image_paths = []
lidar2img_rts = []
lidar2cam_rts = []
cam_intrinsics = []
for cam_type, cam_info in info['cams'].items():
image_paths.append(cam_info['data_path'])
# obtain lidar to image transformation matrix
lidar2cam_r = np.linalg.inv(cam_info['sensor2lidar_rotation'])
lidar2cam_t = cam_info[
'sensor2lidar_translation'] @ lidar2cam_r.T
lidar2cam_rt = np.eye(4)
lidar2cam_rt[:3, :3] = lidar2cam_r.T
lidar2cam_rt[3, :3] = -lidar2cam_t
intrinsic = cam_info['cam_intrinsic']
viewpad = np.eye(4)
viewpad[:intrinsic.shape[0], :intrinsic.shape[1]] = intrinsic
lidar2img_rt = (viewpad @ lidar2cam_rt.T)
lidar2img_rts.append(lidar2img_rt)
cam_intrinsics.append(viewpad)
lidar2cam_rts.append(lidar2cam_rt.T)
input_dict.update(
dict(
img_filename=image_paths,
lidar2img=lidar2img_rts,
cam_intrinsic=cam_intrinsics,
lidar2cam=lidar2cam_rts,
))
if not self.test_mode:
annos = self.get_ann_info(index)
input_dict['ann_info'] = annos
rotation = Quaternion(input_dict['ego2global_rotation'])
translation = input_dict['ego2global_translation']
can_bus = input_dict['can_bus']
can_bus[:3] = translation
can_bus[3:7] = rotation
patch_angle = quaternion_yaw(rotation) / np.pi * 180
if patch_angle < 0:
patch_angle += 360
can_bus[-2] = patch_angle / 180 * np.pi
can_bus[-1] = patch_angle
return input_dict
def __getitem__(self, idx):
"""Get item from infos according to the given index.
Returns:
dict: Data dictionary of the corresponding index.
"""
if self.test_mode:
return self.prepare_test_data(idx)
while True:
data = self.prepare_train_data(idx)
if data is None:
idx = self._rand_another(idx)
continue
return data
def evaluate_miou(self, occ_results, runner=None, show_dir=None, **eval_kwargs):
if show_dir is not None:
if not os.path.exists(show_dir):
os.mkdir(show_dir)
print('\nSaving output and gt in {} for visualization.'.format(show_dir))
begin=eval_kwargs.get('begin',None)
end=eval_kwargs.get('end',None)
self.occ_eval_metrics = Metric_mIoU(
num_classes=18,
use_lidar_mask=False,
use_image_mask=True)
if self.eval_fscore:
self.fscore_eval_metrics = Metric_FScore(
leaf_size=10,
threshold_acc=0.4,
threshold_complete=0.4,
voxel_size=[0.4, 0.4, 0.4],
range=[-40, -40, -1, 40, 40, 5.4],
void=[17, 255],
use_lidar_mask=False,
use_image_mask=True,
)
print('\nStarting Evaluation...')
for index, occ_pred in enumerate(tqdm(occ_results)):
info = self.data_infos[index]
occ_gt = np.load(os.path.join(self.data_root, info['occ_gt_path']))
if show_dir is not None:
if begin is not None and end is not None:
if index>= begin and index<end:
sample_token = info['token']
save_path = os.path.join(show_dir,str(index).zfill(4))
np.savez_compressed(save_path, pred=occ_pred, gt=occ_gt, sample_token=sample_token)
else:
sample_token=info['token']
save_path=os.path.join(show_dir,str(index).zfill(4))
np.savez_compressed(save_path,pred=occ_pred,gt=occ_gt,sample_token=sample_token)
gt_semantics = occ_gt['semantics']
mask_lidar = occ_gt['mask_lidar'].astype(bool)
mask_camera = occ_gt['mask_camera'].astype(bool)
# occ_pred = occ_pred
self.occ_eval_metrics.add_batch(occ_pred, gt_semantics, mask_lidar, mask_camera)
if self.eval_fscore:
self.fscore_eval_metrics.add_batch(occ_pred, gt_semantics, mask_lidar, mask_camera)
self.occ_eval_metrics.count_miou()
if self.eval_fscore:
self.fscore_eval_metrics.count_fscore()
import argparse
import copy
import json
import os
import time
from typing import Tuple, Dict, Any
import torch
import numpy as np
from nuscenes import NuScenes
from nuscenes.eval.common.config import config_factory
from nuscenes.eval.common.data_classes import EvalBoxes
from nuscenes.eval.detection.data_classes import DetectionConfig
from nuscenes.eval.detection.evaluate import NuScenesEval
from pyquaternion import Quaternion
from nuscenes import NuScenes
from nuscenes.eval.common.data_classes import EvalBoxes
from nuscenes.eval.detection.data_classes import DetectionBox
from nuscenes.eval.detection.utils import category_to_detection_name
from nuscenes.eval.tracking.data_classes import TrackingBox
from nuscenes.utils.data_classes import Box
from nuscenes.utils.geometry_utils import points_in_box
from nuscenes.utils.splits import create_splits_scenes
from nuscenes.eval.common.loaders import load_prediction, add_center_dist, filter_eval_boxes
import tqdm
from nuscenes.utils.geometry_utils import view_points, box_in_image, BoxVisibility, transform_matrix
from torchvision.transforms.functional import rotate
import pycocotools.mask as mask_util
# from projects.mmdet3d_plugin.models.utils.visual import save_tensor
from torchvision.transforms.functional import rotate
import cv2
import argparse
import json
import os
import random
import time
from typing import Tuple, Dict, Any
import numpy as np
from nuscenes import NuScenes
from nuscenes.eval.common.config import config_factory
from nuscenes.eval.common.data_classes import EvalBoxes
from nuscenes.eval.common.loaders import load_prediction, load_gt, add_center_dist, filter_eval_boxes
from nuscenes.eval.detection.algo import accumulate, calc_ap, calc_tp
from nuscenes.eval.detection.constants import TP_METRICS
from nuscenes.eval.detection.data_classes import DetectionConfig, DetectionMetrics, DetectionBox, \
DetectionMetricDataList
from nuscenes.eval.detection.render import summary_plot, class_pr_curve, dist_pr_curve, visualize_sample
from nuscenes.eval.common.utils import quaternion_yaw, Quaternion
from mmdet3d.core.bbox.iou_calculators import BboxOverlaps3D
from IPython import embed
import json
from typing import Any
import numpy as np
from matplotlib import pyplot as plt
from nuscenes import NuScenes
from nuscenes.eval.common.data_classes import EvalBoxes
from nuscenes.eval.common.render import setup_axis
from nuscenes.eval.common.utils import boxes_to_sensor
from nuscenes.eval.detection.constants import TP_METRICS, DETECTION_NAMES, DETECTION_COLORS, TP_METRICS_UNITS, \
PRETTY_DETECTION_NAMES, PRETTY_TP_METRICS
from nuscenes.eval.detection.data_classes import DetectionMetrics, DetectionMetricData, DetectionMetricDataList
from nuscenes.utils.data_classes import LidarPointCloud
from nuscenes.utils.geometry_utils import view_points
Axis = Any
def class_tp_curve(md_list: DetectionMetricDataList,
metrics: DetectionMetrics,
detection_name: str,
min_recall: float,
dist_th_tp: float,
savepath: str = None,
ax: Axis = None) -> None:
"""
Plot the true positive curve for the specified class.
:param md_list: DetectionMetricDataList instance.
:param metrics: DetectionMetrics instance.
:param detection_name:
:param min_recall: Minimum recall value.
:param dist_th_tp: The distance threshold used to determine matches.
:param savepath: If given, saves the the rendering here instead of displaying.
:param ax: Axes onto which to render.
"""
# Get metric data for given detection class with tp distance threshold.
md = md_list[(detection_name, dist_th_tp)]
min_recall_ind = round(100 * min_recall)
if min_recall_ind <= md.max_recall_ind:
# For traffic_cone and barrier only a subset of the metrics are plotted.
rel_metrics = [m for m in TP_METRICS if not np.isnan(metrics.get_label_tp(detection_name, m))]
ylimit = max([max(getattr(md, metric)[min_recall_ind:md.max_recall_ind + 1]) for metric in rel_metrics]) * 1.1
else:
ylimit = 1.0
# Prepare axis.
if ax is None:
ax = setup_axis(title=PRETTY_DETECTION_NAMES[detection_name], xlabel='Recall', ylabel='Error', xlim=1,
min_recall=min_recall)
ax.set_ylim(0, ylimit)
# Plot the recall vs. error curve for each tp metric.
for metric in TP_METRICS:
tp = metrics.get_label_tp(detection_name, metric)
# Plot only if we have valid data.
if tp is not np.nan and min_recall_ind <= md.max_recall_ind:
recall, error = md.recall[:md.max_recall_ind + 1], getattr(md, metric)[:md.max_recall_ind + 1]
else:
recall, error = [], []
# Change legend based on tp value
if tp is np.nan:
label = '{}: n/a'.format(PRETTY_TP_METRICS[metric])
elif min_recall_ind > md.max_recall_ind:
label = '{}: nan'.format(PRETTY_TP_METRICS[metric])
else:
label = '{}: {:.2f} ({})'.format(PRETTY_TP_METRICS[metric], tp, TP_METRICS_UNITS[metric])
if metric == 'trans_err':
label += f' ({md.max_recall_ind})' # add recall
print(f'Recall: {detection_name}: {md.max_recall_ind/100}')
ax.plot(recall, error, label=label)
ax.axvline(x=md.max_recall, linestyle='-.', color=(0, 0, 0, 0.3))
ax.legend(loc='best')
if savepath is not None:
plt.savefig(savepath)
plt.close()
class DetectionBox_modified(DetectionBox):
def __init__(self, *args, token=None, visibility=None, index=None, **kwargs):
'''
add annotation token
'''
super().__init__(*args, **kwargs)
self.token = token
self.visibility = visibility
self.index = index
def serialize(self) -> dict:
""" Serialize instance into json-friendly format. """
return {
'token': self.token,
'sample_token': self.sample_token,
'translation': self.translation,
'size': self.size,
'rotation': self.rotation,
'velocity': self.velocity,
'ego_translation': self.ego_translation,
'num_pts': self.num_pts,
'detection_name': self.detection_name,
'detection_score': self.detection_score,
'attribute_name': self.attribute_name,
'visibility': self.visibility,
'index': self.index
}
@classmethod
def deserialize(cls, content: dict):
""" Initialize from serialized content. """
return cls(
token=content['token'],
sample_token=content['sample_token'],
translation=tuple(content['translation']),
size=tuple(content['size']),
rotation=tuple(content['rotation']),
velocity=tuple(content['velocity']),
ego_translation=(0.0, 0.0, 0.0) if 'ego_translation' not in content
else tuple(content['ego_translation']),
num_pts=-1 if 'num_pts' not in content else int(content['num_pts']),
detection_name=content['detection_name'],
detection_score=-1.0 if 'detection_score' not in content else float(content['detection_score']),
attribute_name=content['attribute_name'],
visibility=content['visibility'],
index=content['index'],
)
def center_in_image(box, intrinsic: np.ndarray, imsize: Tuple[int, int], vis_level: int = BoxVisibility.ANY) -> bool:
"""
Check if a box is visible inside an image without accounting for occlusions.
:param box: The box to be checked.
:param intrinsic: <float: 3, 3>. Intrinsic camera matrix.
:param imsize: (width, height).
:param vis_level: One of the enumerations of <BoxVisibility>.
:return True if visibility condition is satisfied.
"""
center_3d = box.center.reshape(3, 1)
center_img = view_points(center_3d, intrinsic, normalize=True)[:2, :]
visible = np.logical_and(center_img[0, :] > 0, center_img[0, :] < imsize[0])
visible = np.logical_and(visible, center_img[1, :] < imsize[1])
visible = np.logical_and(visible, center_img[1, :] > 0)
visible = np.logical_and(visible, center_3d[2, :] > 1)
in_front = center_3d[2, :] > 0.1 # True if a corner is at least 0.1 meter in front of the camera.
if vis_level == BoxVisibility.ALL:
return all(visible) and all(in_front)
elif vis_level == BoxVisibility.ANY:
return any(visible) and all(in_front)
elif vis_level == BoxVisibility.NONE:
return True
else:
raise ValueError("vis_level: {} not valid".format(vis_level))
def exist_corners_in_image_but_not_all(box, intrinsic: np.ndarray, imsize: Tuple[int, int],
vis_level: int = BoxVisibility.ANY) -> bool:
"""
Check if a box is visible in images but not all corners in image .
:param box: The box to be checked.
:param intrinsic: <float: 3, 3>. Intrinsic camera matrix.
:param imsize: (width, height).
:param vis_level: One of the enumerations of <BoxVisibility>.
:return True if visibility condition is satisfied.
"""
corners_3d = box.corners()
corners_img = view_points(corners_3d, intrinsic, normalize=True)[:2, :]
visible = np.logical_and(corners_img[0, :] > 0, corners_img[0, :] < imsize[0])
visible = np.logical_and(visible, corners_img[1, :] < imsize[1])
visible = np.logical_and(visible, corners_img[1, :] > 0)
visible = np.logical_and(visible, corners_3d[2, :] > 1)
in_front = corners_3d[2, :] > 0.1 # True if a corner is at least 0.1 meter in front of the camera.
if any(visible) and not all(visible) and all(in_front):
return True
else:
return False
def load_gt(nusc: NuScenes, eval_split: str, box_cls, verbose: bool = False):
"""
Loads ground truth boxes from DB.
:param nusc: A NuScenes instance.
:param eval_split: The evaluation split for which we load GT boxes.
:param box_cls: Type of box to load, e.g. DetectionBox or TrackingBox.
:param verbose: Whether to print messages to stdout.
:return: The GT boxes.
"""
# Init.
if box_cls == DetectionBox_modified:
attribute_map = {a['token']: a['name'] for a in nusc.attribute}
if verbose:
print('Loading annotations for {} split from nuScenes version: {}'.format(eval_split, nusc.version))
# Read out all sample_tokens in DB.
sample_tokens_all = [s['token'] for s in nusc.sample]
assert len(sample_tokens_all) > 0, "Error: Database has no samples!"
# Only keep samples from this split.
splits = create_splits_scenes()
# Check compatibility of split with nusc_version.
version = nusc.version
if eval_split in {'train', 'val', 'train_detect', 'train_track'}:
assert version.endswith('trainval'), \
'Error: Requested split {} which is not compatible with NuScenes version {}'.format(eval_split, version)
elif eval_split in {'mini_train', 'mini_val'}:
assert version.endswith('mini'), \
'Error: Requested split {} which is not compatible with NuScenes version {}'.format(eval_split, version)
elif eval_split == 'test':
assert version.endswith('test'), \
'Error: Requested split {} which is not compatible with NuScenes version {}'.format(eval_split, version)
else:
raise ValueError('Error: Requested split {} which this function cannot map to the correct NuScenes version.'
.format(eval_split))
if eval_split == 'test':
# Check that you aren't trying to cheat :).
assert len(nusc.sample_annotation) > 0, \
'Error: You are trying to evaluate on the test set but you do not have the annotations!'
index_map = {}
for scene in nusc.scene:
first_sample_token = scene['first_sample_token']
sample = nusc.get('sample', first_sample_token)
index_map[first_sample_token] = 1
index = 2
while sample['next'] != '':
sample = nusc.get('sample', sample['next'])
index_map[sample['token']] = index
index += 1
sample_tokens = []
for sample_token in sample_tokens_all:
scene_token = nusc.get('sample', sample_token)['scene_token']
scene_record = nusc.get('scene', scene_token)
if scene_record['name'] in splits[eval_split]:
sample_tokens.append(sample_token)
all_annotations = EvalBoxes()
# Load annotations and filter predictions and annotations.
tracking_id_set = set()
for sample_token in tqdm.tqdm(sample_tokens, leave=verbose):
sample = nusc.get('sample', sample_token)
sample_annotation_tokens = sample['anns']
sample_boxes = []
for sample_annotation_token in sample_annotation_tokens:
sample_annotation = nusc.get('sample_annotation', sample_annotation_token)
if box_cls == DetectionBox_modified:
# Get label name in detection task and filter unused labels.
detection_name = category_to_detection_name(sample_annotation['category_name'])
if detection_name is None:
continue
# Get attribute_name.
attr_tokens = sample_annotation['attribute_tokens']
attr_count = len(attr_tokens)
if attr_count == 0:
attribute_name = ''
elif attr_count == 1:
attribute_name = attribute_map[attr_tokens[0]]
else:
raise Exception('Error: GT annotations must not have more than one attribute!')
sample_boxes.append(
box_cls(
token=sample_annotation_token,
sample_token=sample_token,
translation=sample_annotation['translation'],
size=sample_annotation['size'],
rotation=sample_annotation['rotation'],
velocity=nusc.box_velocity(sample_annotation['token'])[:2],
num_pts=sample_annotation['num_lidar_pts'] + sample_annotation['num_radar_pts'],
detection_name=detection_name,
detection_score=-1.0, # GT samples do not have a score.
attribute_name=attribute_name,
visibility=sample_annotation['visibility_token'],
index=index_map[sample_token]
)
)
elif box_cls == TrackingBox:
assert False
else:
raise NotImplementedError('Error: Invalid box_cls %s!' % box_cls)
all_annotations.add_boxes(sample_token, sample_boxes)
if verbose:
print("Loaded ground truth annotations for {} samples.".format(len(all_annotations.sample_tokens)))
return all_annotations
def filter_eval_boxes_by_id(nusc: NuScenes,
eval_boxes: EvalBoxes,
id=None,
verbose: bool = False) -> EvalBoxes:
"""
Applies filtering to boxes. Distance, bike-racks and points per box.
:param nusc: An instance of the NuScenes class.
:param eval_boxes: An instance of the EvalBoxes class.
:param is: the anns token set that used to keep bboxes.
:param verbose: Whether to print to stdout.
"""
# Accumulators for number of filtered boxes.
total, anns_filter = 0, 0
for ind, sample_token in enumerate(eval_boxes.sample_tokens):
# Filter on anns
total += len(eval_boxes[sample_token])
filtered_boxes = []
for box in eval_boxes[sample_token]:
if box.token in id:
filtered_boxes.append(box)
anns_filter += len(filtered_boxes)
eval_boxes.boxes[sample_token] = filtered_boxes
if verbose:
print("=> Original number of boxes: %d" % total)
print("=> After anns based filtering: %d" % anns_filter)
return eval_boxes
def filter_eval_boxes_by_visibility(
ori_eval_boxes: EvalBoxes,
visibility=None,
verbose: bool = False) -> EvalBoxes:
"""
Applies filtering to boxes. Distance, bike-racks and points per box.
:param nusc: An instance of the NuScenes class.
:param eval_boxes: An instance of the EvalBoxes class.
:param is: the anns token set that used to keep bboxes.
:param verbose: Whether to print to stdout.
"""
# Accumulators for number of filtered boxes.
eval_boxes = copy.deepcopy(ori_eval_boxes)
total, anns_filter = 0, 0
for ind, sample_token in enumerate(eval_boxes.sample_tokens):
# Filter on anns
total += len(eval_boxes[sample_token])
filtered_boxes = []
for box in eval_boxes[sample_token]:
if box.visibility == visibility:
filtered_boxes.append(box)
anns_filter += len(filtered_boxes)
eval_boxes.boxes[sample_token] = filtered_boxes
if verbose:
print("=> Original number of boxes: %d" % total)
print("=> After visibility based filtering: %d" % anns_filter)
return eval_boxes
def filter_by_sample_token(ori_eval_boxes, valid_sample_tokens=[], verbose=False):
eval_boxes = copy.deepcopy(ori_eval_boxes)
for sample_token in eval_boxes.sample_tokens:
if sample_token not in valid_sample_tokens:
eval_boxes.boxes.pop(sample_token)
return eval_boxes
def filter_eval_boxes_by_overlap(nusc: NuScenes,
eval_boxes: EvalBoxes,
verbose: bool = False) -> EvalBoxes:
"""
Applies filtering to boxes. basedon overlap .
:param nusc: An instance of the NuScenes class.
:param eval_boxes: An instance of the EvalBoxes class.
:param verbose: Whether to print to stdout.
"""
# Accumulators for number of filtered boxes.
cams = ['CAM_FRONT',
'CAM_FRONT_RIGHT',
'CAM_BACK_RIGHT',
'CAM_BACK',
'CAM_BACK_LEFT',
'CAM_FRONT_LEFT']
total, anns_filter = 0, 0
for ind, sample_token in enumerate(eval_boxes.sample_tokens):
# Filter on anns
total += len(eval_boxes[sample_token])
sample_record = nusc.get('sample', sample_token)
filtered_boxes = []
for box in eval_boxes[sample_token]:
count = 0
for cam in cams:
'''
copy-paste form nuscens
'''
sample_data_token = sample_record['data'][cam]
sd_record = nusc.get('sample_data', sample_data_token)
cs_record = nusc.get('calibrated_sensor', sd_record['calibrated_sensor_token'])
sensor_record = nusc.get('sensor', cs_record['sensor_token'])
pose_record = nusc.get('ego_pose', sd_record['ego_pose_token'])
cam_intrinsic = np.array(cs_record['camera_intrinsic'])
imsize = (sd_record['width'], sd_record['height'])
new_box = Box(box.translation, box.size, Quaternion(box.rotation),
name=box.detection_name, token='')
# Move box to ego vehicle coord system.
new_box.translate(-np.array(pose_record['translation']))
new_box.rotate(Quaternion(pose_record['rotation']).inverse)
# Move box to sensor coord system.
new_box.translate(-np.array(cs_record['translation']))
new_box.rotate(Quaternion(cs_record['rotation']).inverse)
if center_in_image(new_box, cam_intrinsic, imsize, vis_level=BoxVisibility.ANY):
count += 1
# if exist_corners_in_image_but_not_all(new_box, cam_intrinsic, imsize, vis_level=BoxVisibility.ANY):
# count += 1
if count > 1:
with open('center_overlap.txt', 'a') as f:
try:
f.write(box.token + '\n')
except:
pass
filtered_boxes.append(box)
anns_filter += len(filtered_boxes)
eval_boxes.boxes[sample_token] = filtered_boxes
verbose = True
if verbose:
print("=> Original number of boxes: %d" % total)
print("=> After anns based filtering: %d" % anns_filter)
return eval_boxes
class NuScenesEval_custom(NuScenesEval):
"""
Dummy class for backward-compatibility. Same as DetectionEval.
"""
def __init__(self,
nusc: NuScenes,
config: DetectionConfig,
result_path: str,
eval_set: str,
output_dir: str = None,
verbose: bool = True,
overlap_test=False,
eval_mask=False,
data_infos=None
):
"""
Initialize a DetectionEval object.
:param nusc: A NuScenes object.
:param config: A DetectionConfig object.
:param result_path: Path of the nuScenes JSON result file.
:param eval_set: The dataset split to evaluate on, e.g. train, val or test.
:param output_dir: Folder to save plots and results to.
:param verbose: Whether to print to stdout.
"""
self.nusc = nusc
self.result_path = result_path
self.eval_set = eval_set
self.output_dir = output_dir
self.verbose = verbose
self.cfg = config
self.overlap_test = overlap_test
self.eval_mask = eval_mask
self.data_infos = data_infos
# Check result file exists.
assert os.path.exists(result_path), 'Error: The result file does not exist!'
# Make dirs.
self.plot_dir = os.path.join(self.output_dir, 'plots')
if not os.path.isdir(self.output_dir):
os.makedirs(self.output_dir)
if not os.path.isdir(self.plot_dir):
os.makedirs(self.plot_dir)
# Load data.
if verbose:
print('Initializing nuScenes detection evaluation')
self.pred_boxes, self.meta = load_prediction(self.result_path, self.cfg.max_boxes_per_sample, DetectionBox,
verbose=verbose)
self.gt_boxes = load_gt(self.nusc, self.eval_set, DetectionBox_modified, verbose=verbose)
assert set(self.pred_boxes.sample_tokens) == set(self.gt_boxes.sample_tokens), \
"Samples in split doesn't match samples in predictions."
# Add center distances.
self.pred_boxes = add_center_dist(nusc, self.pred_boxes)
self.gt_boxes = add_center_dist(nusc, self.gt_boxes)
# Filter boxes (distance, points per box, etc.).
if verbose:
print('Filtering predictions')
self.pred_boxes = filter_eval_boxes(nusc, self.pred_boxes, self.cfg.class_range, verbose=verbose)
if verbose:
print('Filtering ground truth annotations')
self.gt_boxes = filter_eval_boxes(nusc, self.gt_boxes, self.cfg.class_range, verbose=verbose)
if self.overlap_test:
self.pred_boxes = filter_eval_boxes_by_overlap(self.nusc, self.pred_boxes)
self.gt_boxes = filter_eval_boxes_by_overlap(self.nusc, self.gt_boxes, verbose=True)
self.all_gt = copy.deepcopy(self.gt_boxes)
self.all_preds = copy.deepcopy(self.pred_boxes)
self.sample_tokens = self.gt_boxes.sample_tokens
self.index_map = {}
for scene in nusc.scene:
first_sample_token = scene['first_sample_token']
sample = nusc.get('sample', first_sample_token)
self.index_map[first_sample_token] = 1
index = 2
while sample['next'] != '':
sample = nusc.get('sample', sample['next'])
self.index_map[sample['token']] = index
index += 1
def update_gt(self, type_='vis', visibility='1', index=1):
if type_ == 'vis':
self.visibility_test = True
if self.visibility_test:
'''[{'description': 'visibility of whole object is between 0 and 40%',
'token': '1',
'level': 'v0-40'},
{'description': 'visibility of whole object is between 40 and 60%',
'token': '2',
'level': 'v40-60'},
{'description': 'visibility of whole object is between 60 and 80%',
'token': '3',
'level': 'v60-80'},
{'description': 'visibility of whole object is between 80 and 100%',
'token': '4',
'level': 'v80-100'}]'''
self.gt_boxes = filter_eval_boxes_by_visibility(self.all_gt, visibility, verbose=True)
elif type_ == 'ord':
valid_tokens = [key for (key, value) in self.index_map.items() if value == index]
# from IPython import embed
# embed()
self.gt_boxes = filter_by_sample_token(self.all_gt, valid_tokens)
self.pred_boxes = filter_by_sample_token(self.all_preds, valid_tokens)
self.sample_tokens = self.gt_boxes.sample_tokens
def evaluate(self) -> Tuple[DetectionMetrics, DetectionMetricDataList]:
"""
Performs the actual evaluation.
:return: A tuple of high-level and the raw metric data.
"""
start_time = time.time()
# -----------------------------------
# Step 1: Accumulate metric data for all classes and distance thresholds.
# -----------------------------------
if self.verbose:
print('Accumulating metric data...')
metric_data_list = DetectionMetricDataList()
# print(self.cfg.dist_fcn_callable, self.cfg.dist_ths)
# self.cfg.dist_ths = [0.3]
# self.cfg.dist_fcn_callable
for class_name in self.cfg.class_names:
for dist_th in self.cfg.dist_ths:
md = accumulate(self.gt_boxes, self.pred_boxes, class_name, self.cfg.dist_fcn_callable, dist_th)
metric_data_list.set(class_name, dist_th, md)
# -----------------------------------
# Step 2: Calculate metrics from the data.
# -----------------------------------
if self.verbose:
print('Calculating metrics...')
metrics = DetectionMetrics(self.cfg)
for class_name in self.cfg.class_names:
# Compute APs.
for dist_th in self.cfg.dist_ths:
metric_data = metric_data_list[(class_name, dist_th)]
ap = calc_ap(metric_data, self.cfg.min_recall, self.cfg.min_precision)
metrics.add_label_ap(class_name, dist_th, ap)
# Compute TP metrics.
for metric_name in TP_METRICS:
metric_data = metric_data_list[(class_name, self.cfg.dist_th_tp)]
if class_name in ['traffic_cone'] and metric_name in ['attr_err', 'vel_err', 'orient_err']:
tp = np.nan
elif class_name in ['barrier'] and metric_name in ['attr_err', 'vel_err']:
tp = np.nan
else:
tp = calc_tp(metric_data, self.cfg.min_recall, metric_name)
metrics.add_label_tp(class_name, metric_name, tp)
# Compute evaluation time.
metrics.add_runtime(time.time() - start_time)
return metrics, metric_data_list
def render(self, metrics: DetectionMetrics, md_list: DetectionMetricDataList) -> None:
"""
Renders various PR and TP curves.
:param metrics: DetectionMetrics instance.
:param md_list: DetectionMetricDataList instance.
"""
if self.verbose:
print('Rendering PR and TP curves')
def savepath(name):
return os.path.join(self.plot_dir, name + '.pdf')
summary_plot(md_list, metrics, min_precision=self.cfg.min_precision, min_recall=self.cfg.min_recall,
dist_th_tp=self.cfg.dist_th_tp, savepath=savepath('summary'))
for detection_name in self.cfg.class_names:
class_pr_curve(md_list, metrics, detection_name, self.cfg.min_precision, self.cfg.min_recall,
savepath=savepath(detection_name + '_pr'))
class_tp_curve(md_list, metrics, detection_name, self.cfg.min_recall, self.cfg.dist_th_tp,
savepath=savepath(detection_name + '_tp'))
for dist_th in self.cfg.dist_ths:
dist_pr_curve(md_list, metrics, dist_th, self.cfg.min_precision, self.cfg.min_recall,
savepath=savepath('dist_pr_' + str(dist_th)))
if __name__ == "__main__":
# Settings.
parser = argparse.ArgumentParser(description='Evaluate nuScenes detection results.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('result_path', type=str, help='The submission as a JSON file.')
parser.add_argument('--output_dir', type=str, default='~/nuscenes-metrics',
help='Folder to store result metrics, graphs and example visualizations.')
parser.add_argument('--eval_set', type=str, default='val',
help='Which dataset split to evaluate on, train, val or test.')
parser.add_argument('--dataroot', type=str, default='data/nuscenes',
help='Default nuScenes data directory.')
parser.add_argument('--version', type=str, default='v1.0-trainval',
help='Which version of the nuScenes dataset to evaluate on, e.g. v1.0-trainval.')
parser.add_argument('--config_path', type=str, default='',
help='Path to the configuration file.'
'If no path given, the CVPR 2019 configuration will be used.')
parser.add_argument('--plot_examples', type=int, default=0,
help='How many example visualizations to write to disk.')
parser.add_argument('--render_curves', type=int, default=1,
help='Whether to render PR and TP curves to disk.')
parser.add_argument('--verbose', type=int, default=1,
help='Whether to print to stdout.')
args = parser.parse_args()
result_path_ = os.path.expanduser(args.result_path)
output_dir_ = os.path.expanduser(args.output_dir)
eval_set_ = args.eval_set
dataroot_ = args.dataroot
version_ = args.version
config_path = args.config_path
plot_examples_ = args.plot_examples
render_curves_ = bool(args.render_curves)
verbose_ = bool(args.verbose)
if config_path == '':
cfg_ = config_factory('detection_cvpr_2019')
else:
with open(config_path, 'r') as _f:
cfg_ = DetectionConfig.deserialize(json.load(_f))
nusc_ = NuScenes(version=version_, verbose=verbose_, dataroot=dataroot_)
nusc_eval = NuScenesEval_custom(nusc_, config=cfg_, result_path=result_path_, eval_set=eval_set_,
output_dir=output_dir_, verbose=verbose_)
for vis in ['1', '2', '3', '4']:
nusc_eval.update_gt(type_='vis', visibility=vis)
print(f'================ {vis} ===============')
nusc_eval.main(plot_examples=plot_examples_, render_curves=render_curves_)
#for index in range(1, 41):
# nusc_eval.update_gt(type_='ord', index=index)
#
import numpy as np
import os
from pathlib import Path
from tqdm import tqdm
import pickle as pkl
import argparse
import time
import torch
import sys, platform
from sklearn.neighbors import KDTree
from termcolor import colored
from pathlib import Path
from copy import deepcopy
from functools import reduce
np.seterr(divide='ignore', invalid='ignore')
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
def pcolor(string, color, on_color=None, attrs=None):
"""
Produces a colored string for printing
Parameters
----------
string : str
String that will be colored
color : str
Color to use
on_color : str
Background color to use
attrs : list of str
Different attributes for the string
Returns
-------
string: str
Colored string
"""
return colored(string, color, on_color, attrs)
def getCellCoordinates(points, voxelSize):
return (points / voxelSize).astype(np.int)
def getNumUniqueCells(cells):
M = cells.max() + 1
return np.unique(cells[:, 0] + M * cells[:, 1] + M ** 2 * cells[:, 2]).shape[0]
class Metric_mIoU():
def __init__(self,
save_dir='.',
num_classes=18,
use_lidar_mask=False,
use_image_mask=False,
):
self.class_names = ['others','barrier', 'bicycle', 'bus', 'car', 'construction_vehicle',
'motorcycle', 'pedestrian', 'traffic_cone', 'trailer', 'truck',
'driveable_surface', 'other_flat', 'sidewalk',
'terrain', 'manmade', 'vegetation','free']
self.save_dir = save_dir
self.use_lidar_mask = use_lidar_mask
self.use_image_mask = use_image_mask
self.num_classes = num_classes
self.point_cloud_range = [-40.0, -40.0, -1.0, 40.0, 40.0, 5.4]
self.occupancy_size = [0.4, 0.4, 0.4]
self.voxel_size = 0.4
self.occ_xdim = int((self.point_cloud_range[3] - self.point_cloud_range[0]) / self.occupancy_size[0])
self.occ_ydim = int((self.point_cloud_range[4] - self.point_cloud_range[1]) / self.occupancy_size[1])
self.occ_zdim = int((self.point_cloud_range[5] - self.point_cloud_range[2]) / self.occupancy_size[2])
self.voxel_num = self.occ_xdim * self.occ_ydim * self.occ_zdim
self.hist = np.zeros((self.num_classes, self.num_classes))
self.cnt = 0
def hist_info(self, n_cl, pred, gt):
"""
build confusion matrix
# empty classes:0
non-empty class: 0-16
free voxel class: 17
Args:
n_cl (int): num_classes_occupancy
pred (1-d array): pred_occupancy_label
gt (1-d array): gt_occupancu_label
Returns:
tuple:(hist, correctly number_predicted_labels, num_labelled_sample)
"""
assert pred.shape == gt.shape
k = (gt >= 0) & (gt < n_cl) # exclude 255
labeled = np.sum(k)
correct = np.sum((pred[k] == gt[k]))
return (
np.bincount(
n_cl * gt[k].astype(int) + pred[k].astype(int), minlength=n_cl ** 2
).reshape(n_cl, n_cl),
correct,
labeled,
)
def per_class_iu(self, hist):
return np.diag(hist) / (hist.sum(1) + hist.sum(0) - np.diag(hist))
def compute_mIoU(self, pred, label, n_classes):
hist = np.zeros((n_classes, n_classes))
new_hist, correct, labeled = self.hist_info(n_classes, pred.flatten(), label.flatten())
hist += new_hist
mIoUs = self.per_class_iu(hist)
# for ind_class in range(n_classes):
# print(str(round(mIoUs[ind_class] * 100, 2)))
# print('===> mIoU: ' + str(round(np.nanmean(mIoUs) * 100, 2)))
return round(np.nanmean(mIoUs) * 100, 2), hist
def add_batch(self,semantics_pred,semantics_gt,mask_lidar,mask_camera):
self.cnt += 1
if self.use_image_mask:
masked_semantics_gt = semantics_gt[mask_camera]
masked_semantics_pred = semantics_pred[mask_camera]
elif self.use_lidar_mask:
masked_semantics_gt = semantics_gt[mask_lidar]
masked_semantics_pred = semantics_pred[mask_lidar]
else:
masked_semantics_gt = semantics_gt
masked_semantics_pred = semantics_pred
# # pred = np.random.randint(low=0, high=17, size=masked_semantics.shape)
_, _hist = self.compute_mIoU(masked_semantics_pred, masked_semantics_gt, self.num_classes)
self.hist += _hist
def count_miou(self):
mIoU = self.per_class_iu(self.hist)
# assert cnt == num_samples, 'some samples are not included in the miou calculation'
print(f'===> per class IoU of {self.cnt} samples:')
for ind_class in range(self.num_classes-1):
print(f'===> {self.class_names[ind_class]} - IoU = ' + str(round(mIoU[ind_class] * 100, 2)))
print(f'===> mIoU of {self.cnt} samples: ' + str(round(np.nanmean(mIoU[:self.num_classes-1]) * 100, 2)))
# print(f'===> sample-wise averaged mIoU of {cnt} samples: ' + str(round(np.nanmean(mIoU_avg), 2)))
# return mIoU
class Metric_FScore():
def __init__(self,
leaf_size=10,
threshold_acc=0.6,
threshold_complete=0.6,
voxel_size=[0.4, 0.4, 0.4],
range=[-40, -40, -1, 40, 40, 5.4],
void=[17, 255],
use_lidar_mask=False,
use_image_mask=False, ) -> None:
self.leaf_size = leaf_size
self.threshold_acc = threshold_acc
self.threshold_complete = threshold_complete
self.voxel_size = voxel_size
self.range = range
self.void = void
self.use_lidar_mask = use_lidar_mask
self.use_image_mask = use_image_mask
self.cnt=0
self.tot_acc = 0.
self.tot_cmpl = 0.
self.tot_f1_mean = 0.
self.eps = 1e-8
def voxel2points(self, voxel):
# occIdx = torch.where(torch.logical_and(voxel != FREE, voxel != NOT_OBSERVED))
# if isinstance(voxel, np.ndarray): voxel = torch.from_numpy(voxel)
mask = np.logical_not(reduce(np.logical_or, [voxel == self.void[i] for i in range(len(self.void))]))
occIdx = np.where(mask)
points = np.concatenate((occIdx[0][:, None] * self.voxel_size[0] + self.voxel_size[0] / 2 + self.range[0], \
occIdx[1][:, None] * self.voxel_size[1] + self.voxel_size[1] / 2 + self.range[1], \
occIdx[2][:, None] * self.voxel_size[2] + self.voxel_size[2] / 2 + self.range[2]),
axis=1)
return points
def add_batch(self,semantics_pred,semantics_gt,mask_lidar,mask_camera ):
# for scene_token in tqdm(preds_dict.keys()):
self.cnt += 1
if self.use_image_mask:
semantics_gt[mask_camera == False] = 255
semantics_pred[mask_camera == False] = 255
elif self.use_lidar_mask:
semantics_gt[mask_lidar == False] = 255
semantics_pred[mask_lidar == False] = 255
else:
pass
ground_truth = self.voxel2points(semantics_gt)
prediction = self.voxel2points(semantics_pred)
if prediction.shape[0] == 0:
accuracy=0
completeness=0
fmean=0
else:
prediction_tree = KDTree(prediction, leaf_size=self.leaf_size)
ground_truth_tree = KDTree(ground_truth, leaf_size=self.leaf_size)
complete_distance, _ = prediction_tree.query(ground_truth)
complete_distance = complete_distance.flatten()
accuracy_distance, _ = ground_truth_tree.query(prediction)
accuracy_distance = accuracy_distance.flatten()
# evaluate completeness
complete_mask = complete_distance < self.threshold_complete
completeness = complete_mask.mean()
# evalute accuracy
accuracy_mask = accuracy_distance < self.threshold_acc
accuracy = accuracy_mask.mean()
fmean = 2.0 / (1 / (accuracy+self.eps) + 1 / (completeness+self.eps))
self.tot_acc += accuracy
self.tot_cmpl += completeness
self.tot_f1_mean += fmean
def count_fscore(self,):
base_color, attrs = 'red', ['bold', 'dark']
print(pcolor('\n######## F score: {} #######'.format(self.tot_f1_mean / self.cnt), base_color, attrs=attrs))
from .transform_3d import (
PadMultiViewImage, NormalizeMultiviewImage,
PhotoMetricDistortionMultiViewImage, CustomCollect3D, RandomScaleImageMultiViewImage)
from .formating import CustomDefaultFormatBundle3D
from .loading import LoadOccGTFromFile
__all__ = [
'PadMultiViewImage', 'NormalizeMultiviewImage',
'PhotoMetricDistortionMultiViewImage', 'CustomDefaultFormatBundle3D', 'CustomCollect3D', 'RandomScaleImageMultiViewImage'
]
\ No newline at end of file
# Copyright (c) OpenMMLab. All rights reserved.
import numpy as np
from mmcv.parallel import DataContainer as DC
from mmdet3d.core.bbox import BaseInstance3DBoxes
from mmdet3d.core.points import BasePoints
from mmdet.datasets.builder import PIPELINES
from mmdet.datasets.pipelines import to_tensor
from mmdet3d.datasets.pipelines import DefaultFormatBundle3D
@PIPELINES.register_module()
class CustomDefaultFormatBundle3D(DefaultFormatBundle3D):
"""Default formatting bundle.
It simplifies the pipeline of formatting common fields for voxels,
including "proposals", "gt_bboxes", "gt_labels", "gt_masks" and
"gt_semantic_seg".
These fields are formatted as follows.
- img: (1)transpose, (2)to tensor, (3)to DataContainer (stack=True)
- proposals: (1)to tensor, (2)to DataContainer
- gt_bboxes: (1)to tensor, (2)to DataContainer
- gt_bboxes_ignore: (1)to tensor, (2)to DataContainer
- gt_labels: (1)to tensor, (2)to DataContainer
"""
def __call__(self, results):
"""Call function to transform and format common fields in results.
Args:
results (dict): Result dict contains the data to convert.
Returns:
dict: The result dict contains the data that is formatted with
default bundle.
"""
# Format 3D data
results = super(CustomDefaultFormatBundle3D, self).__call__(results)
results['gt_map_masks'] = DC(
to_tensor(results['gt_map_masks']), stack=True)
return results
\ No newline at end of file
import numpy as np
from numpy import random
import mmcv
from mmdet.datasets.builder import PIPELINES
from mmcv.parallel import DataContainer as DC
import os
@PIPELINES.register_module()
class LoadOccGTFromFile(object):
"""Load multi channel images from a list of separate channel files.
Expects results['img_filename'] to be a list of filenames.
note that we read image in BGR style to align with opencv.imread
Args:
to_float32 (bool): Whether to convert the img to float32.
Defaults to False.
color_type (str): Color type of the file. Defaults to 'unchanged'.
"""
def __init__(
self,
data_root,
):
self.data_root = data_root
def __call__(self, results):
# print(results.keys())
occ_gt_path = results['occ_gt_path']
occ_gt_path = os.path.join(self.data_root,occ_gt_path)
occ_labels = np.load(occ_gt_path)
semantics = occ_labels['semantics']
mask_lidar = occ_labels['mask_lidar']
mask_camera = occ_labels['mask_camera']
results['voxel_semantics'] = semantics
results['mask_lidar'] = mask_lidar
results['mask_camera'] = mask_camera
return results
def __repr__(self):
"""str: Return a string that describes the module."""
return "{} (data_root={}')".format(
self.__class__.__name__, self.data_root)
\ No newline at end of file
import numpy as np
from numpy import random
import mmcv
from mmdet.datasets.builder import PIPELINES
from mmcv.parallel import DataContainer as DC
import os
@PIPELINES.register_module()
class PadMultiViewImage(object):
"""Pad the multi-view image.
There are two padding modes: (1) pad to a fixed size and (2) pad to the
minimum size that is divisible by some number.
Added keys are "pad_shape", "pad_fixed_size", "pad_size_divisor",
Args:
size (tuple, optional): Fixed padding size.
size_divisor (int, optional): The divisor of padded size.
pad_val (float, optional): Padding value, 0 by default.
"""
def __init__(self, size=None, size_divisor=None, pad_val=0):
self.size = size
self.size_divisor = size_divisor
self.pad_val = pad_val
# only one of size and size_divisor should be valid
assert size is not None or size_divisor is not None
assert size is None or size_divisor is None
def _pad_img(self, results):
"""Pad images according to ``self.size``."""
if self.size is not None:
padded_img = [mmcv.impad(
img, shape=self.size, pad_val=self.pad_val) for img in results['img']]
elif self.size_divisor is not None:
padded_img = [mmcv.impad_to_multiple(
img, self.size_divisor, pad_val=self.pad_val) for img in results['img']]
results['ori_shape'] = [img.shape for img in results['img']]
results['img'] = padded_img
results['img_shape'] = [img.shape for img in padded_img]
results['pad_shape'] = [img.shape for img in padded_img]
results['pad_fixed_size'] = self.size
results['pad_size_divisor'] = self.size_divisor
def __call__(self, results):
"""Call function to pad images, masks, semantic segmentation maps.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Updated result dict.
"""
self._pad_img(results)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(size={self.size}, '
repr_str += f'size_divisor={self.size_divisor}, '
repr_str += f'pad_val={self.pad_val})'
return repr_str
@PIPELINES.register_module()
class NormalizeMultiviewImage(object):
"""Normalize the image.
Added key is "img_norm_cfg".
Args:
mean (sequence): Mean values of 3 channels.
std (sequence): Std values of 3 channels.
to_rgb (bool): Whether to convert the image from BGR to RGB,
default is true.
"""
def __init__(self, mean, std, to_rgb=True):
self.mean = np.array(mean, dtype=np.float32)
self.std = np.array(std, dtype=np.float32)
self.to_rgb = to_rgb
def __call__(self, results):
"""Call function to normalize images.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Normalized results, 'img_norm_cfg' key is added into
result dict.
"""
results['img'] = [mmcv.imnormalize(img, self.mean, self.std, self.to_rgb) for img in results['img']]
results['img_norm_cfg'] = dict(
mean=self.mean, std=self.std, to_rgb=self.to_rgb)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(mean={self.mean}, std={self.std}, to_rgb={self.to_rgb})'
return repr_str
@PIPELINES.register_module()
class PhotoMetricDistortionMultiViewImage:
"""Apply photometric distortion to image sequentially, every transformation
is applied with a probability of 0.5. The position of random contrast is in
second or second to last.
1. random brightness
2. random contrast (mode 0)
3. convert color from BGR to HSV
4. random saturation
5. random hue
6. convert color from HSV to BGR
7. random contrast (mode 1)
8. randomly swap channels
Args:
brightness_delta (int): delta of brightness.
contrast_range (tuple): range of contrast.
saturation_range (tuple): range of saturation.
hue_delta (int): delta of hue.
"""
def __init__(self,
brightness_delta=32,
contrast_range=(0.5, 1.5),
saturation_range=(0.5, 1.5),
hue_delta=18):
self.brightness_delta = brightness_delta
self.contrast_lower, self.contrast_upper = contrast_range
self.saturation_lower, self.saturation_upper = saturation_range
self.hue_delta = hue_delta
def __call__(self, results):
"""Call function to perform photometric distortion on images.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Result dict with images distorted.
"""
imgs = results['img']
new_imgs = []
for img in imgs:
assert img.dtype == np.float32, \
'PhotoMetricDistortion needs the input image of dtype np.float32,'\
' please set "to_float32=True" in "LoadImageFromFile" pipeline'
# random brightness
if random.randint(2):
delta = random.uniform(-self.brightness_delta,
self.brightness_delta)
img += delta
# mode == 0 --> do random contrast first
# mode == 1 --> do random contrast last
mode = random.randint(2)
if mode == 1:
if random.randint(2):
alpha = random.uniform(self.contrast_lower,
self.contrast_upper)
img *= alpha
# convert color from BGR to HSV
img = mmcv.bgr2hsv(img)
# random saturation
if random.randint(2):
img[..., 1] *= random.uniform(self.saturation_lower,
self.saturation_upper)
# random hue
if random.randint(2):
img[..., 0] += random.uniform(-self.hue_delta, self.hue_delta)
img[..., 0][img[..., 0] > 360] -= 360
img[..., 0][img[..., 0] < 0] += 360
# convert color from HSV to BGR
img = mmcv.hsv2bgr(img)
# random contrast
if mode == 0:
if random.randint(2):
alpha = random.uniform(self.contrast_lower,
self.contrast_upper)
img *= alpha
# randomly swap channels
if random.randint(2):
img = img[..., random.permutation(3)]
new_imgs.append(img)
results['img'] = new_imgs
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(\nbrightness_delta={self.brightness_delta},\n'
repr_str += 'contrast_range='
repr_str += f'{(self.contrast_lower, self.contrast_upper)},\n'
repr_str += 'saturation_range='
repr_str += f'{(self.saturation_lower, self.saturation_upper)},\n'
repr_str += f'hue_delta={self.hue_delta})'
return repr_str
@PIPELINES.register_module()
class CustomCollect3D(object):
"""Collect data from the loader relevant to the specific task.
This is usually the last stage of the data loader pipeline. Typically keys
is set to some subset of "img", "proposals", "gt_bboxes",
"gt_bboxes_ignore", "gt_labels", and/or "gt_masks".
The "img_meta" item is always populated. The contents of the "img_meta"
dictionary depends on "meta_keys". By default this includes:
- 'img_shape': shape of the image input to the network as a tuple \
(h, w, c). Note that images may be zero padded on the \
bottom/right if the batch tensor is larger than this shape.
- 'scale_factor': a float indicating the preprocessing scale
- 'flip': a boolean indicating if image flip transform was used
- 'filename': path to the image file
- 'ori_shape': original shape of the image as a tuple (h, w, c)
- 'pad_shape': image shape after padding
- 'lidar2img': transform from lidar to image
- 'depth2img': transform from depth to image
- 'cam2img': transform from camera to image
- 'pcd_horizontal_flip': a boolean indicating if point cloud is \
flipped horizontally
- 'pcd_vertical_flip': a boolean indicating if point cloud is \
flipped vertically
- 'box_mode_3d': 3D box mode
- 'box_type_3d': 3D box type
- 'img_norm_cfg': a dict of normalization information:
- mean: per channel mean subtraction
- std: per channel std divisor
- to_rgb: bool indicating if bgr was converted to rgb
- 'pcd_trans': point cloud transformations
- 'sample_idx': sample index
- 'pcd_scale_factor': point cloud scale factor
- 'pcd_rotation': rotation applied to point cloud
- 'pts_filename': path to point cloud file.
Args:
keys (Sequence[str]): Keys of results to be collected in ``data``.
meta_keys (Sequence[str], optional): Meta keys to be converted to
``mmcv.DataContainer`` and collected in ``data[img_metas]``.
Default: ('filename', 'ori_shape', 'img_shape', 'lidar2img',
'depth2img', 'cam2img', 'pad_shape', 'scale_factor', 'flip',
'pcd_horizontal_flip', 'pcd_vertical_flip', 'box_mode_3d',
'box_type_3d', 'img_norm_cfg', 'pcd_trans',
'sample_idx', 'pcd_scale_factor', 'pcd_rotation', 'pts_filename')
"""
def __init__(self,
keys,
meta_keys=('filename', 'ori_shape', 'img_shape', 'lidar2img','ego2lidar',
'depth2img', 'cam2img', 'pad_shape',
'scale_factor', 'flip', 'pcd_horizontal_flip',
'pcd_vertical_flip', 'box_mode_3d', 'box_type_3d',
'img_norm_cfg', 'pcd_trans', 'sample_idx', 'prev_idx', 'next_idx',
'pcd_scale_factor', 'pcd_rotation', 'pts_filename',
'transformation_3d_flow', 'scene_token',
'can_bus',
)):
self.keys = keys
self.meta_keys = meta_keys
def __call__(self, results):
"""Call function to collect keys in results. The keys in ``meta_keys``
will be converted to :obj:`mmcv.DataContainer`.
Args:
results (dict): Result dict contains the data to collect.
Returns:
dict: The result dict contains the following keys
- keys in ``self.keys``
- ``img_metas``
"""
data = {}
img_metas = {}
for key in self.meta_keys:
if key in results:
img_metas[key] = results[key]
data['img_metas'] = DC(img_metas, cpu_only=True)
for key in self.keys:
data[key] = results[key]
return data
def __repr__(self):
"""str: Return a string that describes the module."""
return self.__class__.__name__ + \
f'(keys={self.keys}, meta_keys={self.meta_keys})'
@PIPELINES.register_module()
class RandomScaleImageMultiViewImage(object):
"""Random scale the image
Args:
scales
"""
def __init__(self, scales=[]):
self.scales = scales
assert len(self.scales)==1
def __call__(self, results):
"""Call function to pad images, masks, semantic segmentation maps.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Updated result dict.
"""
rand_ind = np.random.permutation(range(len(self.scales)))[0]
rand_scale = self.scales[rand_ind]
y_size = [int(img.shape[0] * rand_scale) for img in results['img']]
x_size = [int(img.shape[1] * rand_scale) for img in results['img']]
scale_factor = np.eye(4)
scale_factor[0, 0] *= rand_scale
scale_factor[1, 1] *= rand_scale
results['img'] = [mmcv.imresize(img, (x_size[idx], y_size[idx]), return_scale=False) for idx, img in
enumerate(results['img'])]
lidar2img = [scale_factor @ l2i for l2i in results['lidar2img']]
results['lidar2img'] = lidar2img
results['img_shape'] = [img.shape for img in results['img']]
results['ori_shape'] = [img.shape for img in results['img']]
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(size={self.scales}, '
return repr_str
\ No newline at end of file
from .group_sampler import DistributedGroupSampler
from .distributed_sampler import DistributedSampler
from .sampler import SAMPLER, build_sampler
import math
import torch
from torch.utils.data import DistributedSampler as _DistributedSampler
from .sampler import SAMPLER
@SAMPLER.register_module()
class DistributedSampler(_DistributedSampler):
def __init__(self,
dataset=None,
num_replicas=None,
rank=None,
shuffle=True,
seed=0):
super().__init__(
dataset, num_replicas=num_replicas, rank=rank, shuffle=shuffle)
# for the compatibility from PyTorch 1.3+
self.seed = seed if seed is not None else 0
def __iter__(self):
# deterministically shuffle based on epoch
if self.shuffle:
assert False
else:
indices = torch.arange(len(self.dataset)).tolist()
# add extra samples to make it evenly divisible
# in case that indices is shorter than half of total_size
indices = (indices *
math.ceil(self.total_size / len(indices)))[:self.total_size]
assert len(indices) == self.total_size
# subsample
per_replicas = self.total_size//self.num_replicas
# indices = indices[self.rank:self.total_size:self.num_replicas]
indices = indices[self.rank*per_replicas:(self.rank+1)*per_replicas]
assert len(indices) == self.num_samples
return iter(indices)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment