Commit 3b8d508a authored by lishj6's avatar lishj6 🏸
Browse files

init_0905

parent e968ab0f
Pipeline #2906 canceled with stages
import numpy as np
import os
from pathlib import Path
from tqdm import tqdm
import pickle as pkl
import argparse
import time
import torch
import sys, platform
from sklearn.neighbors import KDTree
from termcolor import colored
from pathlib import Path
from copy import deepcopy
from functools import reduce
np.seterr(divide='ignore', invalid='ignore')
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
def pcolor(string, color, on_color=None, attrs=None):
"""
Produces a colored string for printing
Parameters
----------
string : str
String that will be colored
color : str
Color to use
on_color : str
Background color to use
attrs : list of str
Different attributes for the string
Returns
-------
string: str
Colored string
"""
return colored(string, color, on_color, attrs)
def getCellCoordinates(points, voxelSize):
return (points / voxelSize).astype(np.int)
def getNumUniqueCells(cells):
M = cells.max() + 1
return np.unique(cells[:, 0] + M * cells[:, 1] + M ** 2 * cells[:, 2]).shape[0]
class Metric_mIoU():
def __init__(self,
save_dir='.',
num_classes=18,
use_lidar_mask=False,
use_image_mask=False,
):
self.class_names = ['others','barrier', 'bicycle', 'bus', 'car', 'construction_vehicle',
'motorcycle', 'pedestrian', 'traffic_cone', 'trailer', 'truck',
'driveable_surface', 'other_flat', 'sidewalk',
'terrain', 'manmade', 'vegetation','free']
self.save_dir = save_dir
self.use_lidar_mask = use_lidar_mask
self.use_image_mask = use_image_mask
self.num_classes = num_classes
self.point_cloud_range = [-40.0, -40.0, -1.0, 40.0, 40.0, 5.4]
self.occupancy_size = [0.4, 0.4, 0.4]
self.voxel_size = 0.4
self.occ_xdim = int((self.point_cloud_range[3] - self.point_cloud_range[0]) / self.occupancy_size[0])
self.occ_ydim = int((self.point_cloud_range[4] - self.point_cloud_range[1]) / self.occupancy_size[1])
self.occ_zdim = int((self.point_cloud_range[5] - self.point_cloud_range[2]) / self.occupancy_size[2])
self.voxel_num = self.occ_xdim * self.occ_ydim * self.occ_zdim
self.hist = np.zeros((self.num_classes, self.num_classes))
self.cnt = 0
def hist_info(self, n_cl, pred, gt):
"""
build confusion matrix
# empty classes:0
non-empty class: 0-16
free voxel class: 17
Args:
n_cl (int): num_classes_occupancy
pred (1-d array): pred_occupancy_label, (N_valid, )
gt (1-d array): gt_occupancu_label, (N_valid, )
Returns:
tuple:(hist, correctly number_predicted_labels, num_labelled_sample)
"""
assert pred.shape == gt.shape
k = (gt >= 0) & (gt < n_cl) # exclude 255
labeled = np.sum(k) # N_total
correct = np.sum((pred[k] == gt[k])) # N_correct
return (
np.bincount(
n_cl * gt[k].astype(int) + pred[k].astype(int), minlength=n_cl ** 2
).reshape(n_cl, n_cl), # (N_cls, N_cls),
correct, # N_correct
labeled, # N_total
)
def per_class_iu(self, hist):
return np.diag(hist) / (hist.sum(1) + hist.sum(0) - np.diag(hist))
def compute_mIoU(self, pred, label, n_classes):
"""
Args:
pred: (N_valid, )
label: (N_valid, )
n_classes: int=18
Returns:
"""
hist = np.zeros((n_classes, n_classes)) # (N_cls, N_cls)
new_hist, correct, labeled = self.hist_info(n_classes, pred.flatten(), label.flatten())
hist += new_hist # (N_cls, N_cls)
mIoUs = self.per_class_iu(hist)
# for ind_class in range(n_classes):
# print(str(round(mIoUs[ind_class] * 100, 2)))
# print('===> mIoU: ' + str(round(np.nanmean(mIoUs) * 100, 2)))
return round(np.nanmean(mIoUs) * 100, 2), hist
def add_batch(self, semantics_pred, semantics_gt, mask_lidar, mask_camera):
"""
Args:
semantics_pred: (Dx, Dy, Dz, n_cls)
semantics_gt: (Dx, Dy, Dz)
mask_lidar: (Dx, Dy, Dz)
mask_camera: (Dx, Dy, Dz)
Returns:
"""
self.cnt += 1
if self.use_image_mask:
masked_semantics_gt = semantics_gt[mask_camera] # (N_valid, )
masked_semantics_pred = semantics_pred[mask_camera] # (N_valid, )
elif self.use_lidar_mask:
masked_semantics_gt = semantics_gt[mask_lidar]
masked_semantics_pred = semantics_pred[mask_lidar]
else:
masked_semantics_gt = semantics_gt
masked_semantics_pred = semantics_pred
# # pred = np.random.randint(low=0, high=17, size=masked_semantics.shape)
_, _hist = self.compute_mIoU(masked_semantics_pred, masked_semantics_gt, self.num_classes)
self.hist += _hist # (N_cls, N_cls) 列对应每个gt类别,行对应每个预测类别, 这样只有对角线位置上的预测是准确的.
def count_miou(self):
mIoU = self.per_class_iu(self.hist)
# assert cnt == num_samples, 'some samples are not included in the miou calculation'
print(f'===> per class IoU of {self.cnt} samples:')
for ind_class in range(self.num_classes-1):
print(f'===> {self.class_names[ind_class]} - IoU = ' + str(round(mIoU[ind_class] * 100, 2)))
print(f'===> mIoU of {self.cnt} samples: ' + str(round(np.nanmean(mIoU[:self.num_classes-1]) * 100, 2)))
# print(f'===> sample-wise averaged mIoU of {cnt} samples: ' + str(round(np.nanmean(mIoU_avg), 2)))
eval_res = dict()
# eval_res['class_name'] = self.class_names
eval_res['mIoU'] = mIoU
# eval_res['cnt'] = self.cnt
return eval_res
class Metric_FScore():
def __init__(self,
leaf_size=10,
threshold_acc=0.6,
threshold_complete=0.6,
voxel_size=[0.4, 0.4, 0.4],
range=[-40, -40, -1, 40, 40, 5.4],
void=[17, 255],
use_lidar_mask=False,
use_image_mask=False, ) -> None:
self.leaf_size = leaf_size
self.threshold_acc = threshold_acc
self.threshold_complete = threshold_complete
self.voxel_size = voxel_size
self.range = range
self.void = void
self.use_lidar_mask = use_lidar_mask
self.use_image_mask = use_image_mask
self.cnt=0
self.tot_acc = 0.
self.tot_cmpl = 0.
self.tot_f1_mean = 0.
self.eps = 1e-8
def voxel2points(self, voxel):
# occIdx = torch.where(torch.logical_and(voxel != FREE, voxel != NOT_OBSERVED))
# if isinstance(voxel, np.ndarray): voxel = torch.from_numpy(voxel)
mask = np.logical_not(reduce(np.logical_or, [voxel == self.void[i] for i in range(len(self.void))]))
occIdx = np.where(mask)
points = np.concatenate((occIdx[0][:, None] * self.voxel_size[0] + self.voxel_size[0] / 2 + self.range[0], \
occIdx[1][:, None] * self.voxel_size[1] + self.voxel_size[1] / 2 + self.range[1], \
occIdx[2][:, None] * self.voxel_size[2] + self.voxel_size[2] / 2 + self.range[2]),
axis=1)
return points
def add_batch(self, semantics_pred, semantics_gt, mask_lidar, mask_camera ):
# for scene_token in tqdm(preds_dict.keys()):
self.cnt += 1
if self.use_image_mask:
semantics_gt[mask_camera == False] = 255
semantics_pred[mask_camera == False] = 255
elif self.use_lidar_mask:
semantics_gt[mask_lidar == False] = 255
semantics_pred[mask_lidar == False] = 255
else:
pass
ground_truth = self.voxel2points(semantics_gt)
prediction = self.voxel2points(semantics_pred)
if prediction.shape[0] == 0:
accuracy=0
completeness=0
fmean=0
else:
prediction_tree = KDTree(prediction, leaf_size=self.leaf_size)
ground_truth_tree = KDTree(ground_truth, leaf_size=self.leaf_size)
complete_distance, _ = prediction_tree.query(ground_truth)
complete_distance = complete_distance.flatten()
accuracy_distance, _ = ground_truth_tree.query(prediction)
accuracy_distance = accuracy_distance.flatten()
# evaluate completeness
complete_mask = complete_distance < self.threshold_complete
completeness = complete_mask.mean()
# evalute accuracy
accuracy_mask = accuracy_distance < self.threshold_acc
accuracy = accuracy_mask.mean()
fmean = 2.0 / (1 / (accuracy+self.eps) + 1 / (completeness+self.eps))
self.tot_acc += accuracy
self.tot_cmpl += completeness
self.tot_f1_mean += fmean
def count_fscore(self,):
base_color, attrs = 'red', ['bold', 'dark']
print(pcolor('\n######## F score: {} #######'.format(self.tot_f1_mean / self.cnt), base_color, attrs=attrs))
# Acknowledgments: https://github.com/tarashakhurana/4d-occ-forecasting
# Modified by Haisong Liu
import math
import copy
import numpy as np
import torch
from torch.utils.cpp_extension import load
from tqdm import tqdm
from prettytable import PrettyTable
from .ray_pq import Metric_RayPQ
dvr = load("dvr", sources=["lib/dvr/dvr.cpp", "lib/dvr/dvr.cu"], verbose=True, extra_cuda_cflags=['-allow-unsupported-compiler'])
_pc_range = [-40, -40, -1.0, 40, 40, 5.4]
_voxel_size = 0.4
occ_class_names = [
'others', 'barrier', 'bicycle', 'bus', 'car', 'construction_vehicle',
'motorcycle', 'pedestrian', 'traffic_cone', 'trailer', 'truck',
'driveable_surface', 'other_flat', 'sidewalk',
'terrain', 'manmade', 'vegetation', 'free'
]
# https://github.com/tarashakhurana/4d-occ-forecasting/blob/ff986082cd6ea10e67ab7839bf0e654736b3f4e2/test_fgbg.py#L29C1-L46C16
def get_rendered_pcds(origin, points, tindex, pred_dist):
pcds = []
for t in range(len(origin)):
mask = (tindex == t)
# skip the ones with no data
if not mask.any():
continue
_pts = points[mask, :3]
# use ground truth lidar points for the raycasting direction
v = _pts - origin[t][None, :]
d = v / np.sqrt((v ** 2).sum(axis=1, keepdims=True))
pred_pts = origin[t][None, :] + d * pred_dist[mask][:, None]
pcds.append(torch.from_numpy(pred_pts))
return pcds
def meshgrid3d(occ_size, pc_range):
W, H, D = occ_size
xs = torch.linspace(0.5, W - 0.5, W).view(W, 1, 1).expand(W, H, D) / W
ys = torch.linspace(0.5, H - 0.5, H).view(1, H, 1).expand(W, H, D) / H
zs = torch.linspace(0.5, D - 0.5, D).view(1, 1, D).expand(W, H, D) / D
xs = xs * (pc_range[3] - pc_range[0]) + pc_range[0]
ys = ys * (pc_range[4] - pc_range[1]) + pc_range[1]
zs = zs * (pc_range[5] - pc_range[2]) + pc_range[2]
xyz = torch.stack((xs, ys, zs), -1)
return xyz
def generate_lidar_rays():
# prepare lidar ray angles
pitch_angles = []
for k in range(10):
angle = math.pi / 2 - math.atan(k + 1)
pitch_angles.append(-angle)
# nuscenes lidar fov: [0.2107773983152201, -0.5439104895672159] (rad)
while pitch_angles[-1] < 0.21:
delta = pitch_angles[-1] - pitch_angles[-2]
pitch_angles.append(pitch_angles[-1] + delta)
lidar_rays = []
for pitch_angle in pitch_angles:
for azimuth_angle in np.arange(0, 360, 1):
azimuth_angle = np.deg2rad(azimuth_angle)
x = np.cos(pitch_angle) * np.cos(azimuth_angle)
y = np.cos(pitch_angle) * np.sin(azimuth_angle)
z = np.sin(pitch_angle)
lidar_rays.append((x, y, z))
return np.array(lidar_rays, dtype=np.float32)
def process_one_sample(sem_pred, lidar_rays, output_origin, instance_pred=None):
# lidar origin in ego coordinate
# lidar_origin = torch.tensor([[[0.9858, 0.0000, 1.8402]]])
T = output_origin.shape[1]
pred_pcds_t = []
free_id = len(occ_class_names) - 1
occ_pred = copy.deepcopy(sem_pred)
occ_pred[sem_pred < free_id] = 1
occ_pred[sem_pred == free_id] = 0
occ_pred = occ_pred.permute(2, 1, 0)
occ_pred = occ_pred[None, None, :].contiguous().float()
offset = torch.Tensor(_pc_range[:3])[None, None, :]
scaler = torch.Tensor([_voxel_size] * 3)[None, None, :]
lidar_tindex = torch.zeros([1, lidar_rays.shape[0]])
for t in range(T):
lidar_origin = output_origin[:, t:t+1, :] # [1, 1, 3]
lidar_endpts = lidar_rays[None] + lidar_origin # [1, 15840, 3]
output_origin_render = ((lidar_origin - offset) / scaler).float() # [1, 1, 3]
output_points_render = ((lidar_endpts - offset) / scaler).float() # [1, N, 3]
output_tindex_render = lidar_tindex # [1, N], all zeros
with torch.no_grad():
pred_dist, _, coord_index = dvr.render_forward(
occ_pred.cuda(),
output_origin_render.cuda(),
output_points_render.cuda(),
output_tindex_render.cuda(),
[1, 16, 200, 200],
"test"
)
pred_dist *= _voxel_size
pred_pcds = get_rendered_pcds(
lidar_origin[0].cpu().numpy(),
lidar_endpts[0].cpu().numpy(),
lidar_tindex[0].cpu().numpy(),
pred_dist[0].cpu().numpy()
)
coord_index = coord_index[0, :, :].long().cpu() # [N, 3]
pred_label = sem_pred[coord_index[:, 0], coord_index[:, 1], coord_index[:, 2]][:, None] # [N, 1]
pred_dist = pred_dist[0, :, None].cpu()
if instance_pred is not None:
pred_instance = instance_pred[coord_index[:, 0], coord_index[:, 1], coord_index[:, 2]][:, None] # [N, 1]
pred_pcds = torch.cat([pred_label.float(), pred_instance.float(), pred_dist], dim=-1)
else:
pred_pcds = torch.cat([pred_label.float(), pred_dist], dim=-1)
pred_pcds_t.append(pred_pcds)
pred_pcds_t = torch.cat(pred_pcds_t, dim=0)
return pred_pcds_t.numpy()
def calc_metrics(pcd_pred_list, pcd_gt_list):
thresholds = [1, 2, 4]
gt_cnt = np.zeros([len(occ_class_names)])
pred_cnt = np.zeros([len(occ_class_names)])
tp_cnt = np.zeros([len(thresholds), len(occ_class_names)])
for pcd_pred, pcd_gt in zip(pcd_pred_list, pcd_gt_list):
for j, threshold in enumerate(thresholds):
# L1
depth_pred = pcd_pred[:, 1]
depth_gt = pcd_gt[:, 1]
l1_error = np.abs(depth_pred - depth_gt)
tp_dist_mask = (l1_error < threshold)
for i, cls in enumerate(occ_class_names):
cls_id = occ_class_names.index(cls)
cls_mask_pred = (pcd_pred[:, 0] == cls_id)
cls_mask_gt = (pcd_gt[:, 0] == cls_id)
gt_cnt_i = cls_mask_gt.sum()
pred_cnt_i = cls_mask_pred.sum()
if j == 0:
gt_cnt[i] += gt_cnt_i
pred_cnt[i] += pred_cnt_i
tp_cls = cls_mask_gt & cls_mask_pred # [N]
tp_mask = np.logical_and(tp_cls, tp_dist_mask)
tp_cnt[j][i] += tp_mask.sum()
iou_list = []
for j, threshold in enumerate(thresholds):
iou_list.append((tp_cnt[j] / (gt_cnt + pred_cnt - tp_cnt[j]))[:-1])
return iou_list
def main_raypq(sem_pred_list, sem_gt_list, inst_pred_list, inst_gt_list, lidar_origin_list):
torch.cuda.empty_cache()
eval_metrics_pq = Metric_RayPQ(
num_classes=len(occ_class_names),
thresholds=[1, 2, 4]
)
# generate lidar rays
lidar_rays = generate_lidar_rays()
lidar_rays = torch.from_numpy(lidar_rays)
for sem_pred, sem_gt, inst_pred, inst_gt, lidar_origins in \
tqdm(zip(sem_pred_list, sem_gt_list, inst_pred_list, inst_gt_list, lidar_origin_list), ncols=50):
sem_pred = torch.from_numpy(np.reshape(sem_pred, [200, 200, 16]))
sem_gt = torch.from_numpy(np.reshape(sem_gt, [200, 200, 16]))
inst_pred = torch.from_numpy(np.reshape(inst_pred, [200, 200, 16]))
inst_gt = torch.from_numpy(np.reshape(inst_gt, [200, 200, 16]))
pcd_pred = process_one_sample(sem_pred, lidar_rays, lidar_origins, instance_pred=inst_pred)
pcd_gt = process_one_sample(sem_gt, lidar_rays, lidar_origins, instance_pred=inst_gt)
# evalute on non-free rays
valid_mask = (pcd_gt[:, 0].astype(np.int32) != len(occ_class_names) - 1)
pcd_pred = pcd_pred[valid_mask]
pcd_gt = pcd_gt[valid_mask]
assert pcd_pred.shape == pcd_gt.shape
sem_gt = pcd_gt[:, 0].astype(np.int32)
sem_pred = pcd_pred[:, 0].astype(np.int32)
instances_gt = pcd_gt[:, 1].astype(np.int32)
instances_pred = pcd_pred[:, 1].astype(np.int32)
# L1
depth_gt = pcd_gt[:, 2]
depth_pred = pcd_pred[:, 2]
l1_error = np.abs(depth_pred - depth_gt)
eval_metrics_pq.add_batch(sem_pred, sem_gt, instances_pred, instances_gt, l1_error)
torch.cuda.empty_cache()
return eval_metrics_pq.count_pq()
def main(sem_pred_list, sem_gt_list, lidar_origin_list):
torch.cuda.empty_cache()
# generate lidar rays
lidar_rays = generate_lidar_rays()
lidar_rays = torch.from_numpy(lidar_rays)
pcd_pred_list, pcd_gt_list = [], []
for sem_pred, sem_gt, lidar_origins in tqdm(zip(sem_pred_list, sem_gt_list, lidar_origin_list), ncols=50):
sem_pred = torch.from_numpy(np.reshape(sem_pred, [200, 200, 16]))
sem_gt = torch.from_numpy(np.reshape(sem_gt, [200, 200, 16]))
pcd_pred = process_one_sample(sem_pred, lidar_rays, lidar_origins)
pcd_gt = process_one_sample(sem_gt, lidar_rays, lidar_origins)
# evalute on non-free rays
valid_mask = (pcd_gt[:, 0].astype(np.int32) != len(occ_class_names) - 1)
pcd_pred = pcd_pred[valid_mask]
pcd_gt = pcd_gt[valid_mask]
assert pcd_pred.shape == pcd_gt.shape
pcd_pred_list.append(pcd_pred)
pcd_gt_list.append(pcd_gt)
iou_list = calc_metrics(pcd_pred_list, pcd_gt_list)
rayiou = np.nanmean(iou_list)
rayiou_0 = np.nanmean(iou_list[0])
rayiou_1 = np.nanmean(iou_list[1])
rayiou_2 = np.nanmean(iou_list[2])
table = PrettyTable([
'Class Names',
'RayIoU@1', 'RayIoU@2', 'RayIoU@4'
])
table.float_format = '.3'
for i in range(len(occ_class_names) - 1):
table.add_row([
occ_class_names[i],
iou_list[0][i], iou_list[1][i], iou_list[2][i]
], divider=(i == len(occ_class_names) - 2))
table.add_row(['MEAN', rayiou_0, rayiou_1, rayiou_2])
print(table)
torch.cuda.empty_cache()
return {
'RayIoU': rayiou,
'RayIoU@1': rayiou_0,
'RayIoU@2': rayiou_1,
'RayIoU@4': rayiou_2,
}
import numpy as np
from prettytable import PrettyTable
class Metric_RayPQ:
def __init__(self,
num_classes=18,
thresholds=[1, 2, 4]):
"""
Args:
ignore_index (llist): Class ids that not be considered in pq counting.
"""
if num_classes == 18:
self.class_names = [
'others','barrier', 'bicycle', 'bus', 'car', 'construction_vehicle',
'motorcycle', 'pedestrian', 'traffic_cone', 'trailer', 'truck',
'driveable_surface', 'other_flat', 'sidewalk',
'terrain', 'manmade', 'vegetation','free'
]
else:
raise ValueError
self.num_classes = num_classes
self.id_offset = 2 ** 16
self.eps = 1e-5
self.thresholds = thresholds
self.min_num_points = 10
self.include = np.array(
[n for n in range(self.num_classes - 1)],
dtype=int)
self.cnt = 0
# panoptic stuff
self.pan_tp = np.zeros([len(self.thresholds), num_classes], dtype=int)
self.pan_iou = np.zeros([len(self.thresholds), num_classes], dtype=np.double)
self.pan_fp = np.zeros([len(self.thresholds), num_classes], dtype=int)
self.pan_fn = np.zeros([len(self.thresholds), num_classes], dtype=int)
def add_batch(self,semantics_pred,semantics_gt,instances_pred,instances_gt, l1_error):
self.cnt += 1
self.add_panoptic_sample(semantics_pred, semantics_gt, instances_pred, instances_gt, l1_error)
def add_panoptic_sample(self, semantics_pred, semantics_gt, instances_pred, instances_gt, l1_error):
"""Add one sample of panoptic predictions and ground truths for
evaluation.
Args:
semantics_pred (np.ndarray): Semantic predictions.
semantics_gt (np.ndarray): Semantic ground truths.
instances_pred (np.ndarray): Instance predictions.
instances_gt (np.ndarray): Instance ground truths.
"""
# get instance_class_id from instance_gt
instance_class_ids = [self.num_classes - 1]
for i in range(1, instances_gt.max() + 1):
class_id = np.unique(semantics_gt[instances_gt == i])
# assert class_id.shape[0] == 1, "each instance must belong to only one class"
if class_id.shape[0] == 1:
instance_class_ids.append(class_id[0])
else:
instance_class_ids.append(self.num_classes - 1)
instance_class_ids = np.array(instance_class_ids)
instance_count = 1
final_instance_class_ids = []
final_instances = np.zeros_like(instances_gt) # empty space has instance id "0"
for class_id in range(self.num_classes - 1):
if np.sum(semantics_gt == class_id) == 0:
continue
if self.class_names[class_id] in ['car', 'truck', 'construction_vehicle', 'bus', 'trailer', 'motorcycle', 'bicycle', 'pedestrian']:
# treat as instances
for instance_id in range(len(instance_class_ids)):
if instance_class_ids[instance_id] != class_id:
continue
final_instances[instances_gt == instance_id] = instance_count
instance_count += 1
final_instance_class_ids.append(class_id)
else:
# treat as semantics
final_instances[semantics_gt == class_id] = instance_count
instance_count += 1
final_instance_class_ids.append(class_id)
instances_gt = final_instances
# avoid zero (ignored label)
instances_pred = instances_pred + 1
instances_gt = instances_gt + 1
for j, threshold in enumerate(self.thresholds):
tp_dist_mask = l1_error < threshold
# for each class (except the ignored ones)
for cl in self.include:
# get a class mask
pred_inst_in_cl_mask = semantics_pred == cl
gt_inst_in_cl_mask = semantics_gt == cl
# get instance points in class (makes outside stuff 0)
pred_inst_in_cl = instances_pred * pred_inst_in_cl_mask.astype(int)
gt_inst_in_cl = instances_gt * gt_inst_in_cl_mask.astype(int)
# generate the areas for each unique instance prediction
unique_pred, counts_pred = np.unique(
pred_inst_in_cl[pred_inst_in_cl > 0], return_counts=True)
id2idx_pred = {id: idx for idx, id in enumerate(unique_pred)}
matched_pred = np.array([False] * unique_pred.shape[0])
# generate the areas for each unique instance gt_np
unique_gt, counts_gt = np.unique(
gt_inst_in_cl[gt_inst_in_cl > 0], return_counts=True)
id2idx_gt = {id: idx for idx, id in enumerate(unique_gt)}
matched_gt = np.array([False] * unique_gt.shape[0])
# generate intersection using offset
valid_combos = np.logical_and(pred_inst_in_cl > 0,
gt_inst_in_cl > 0)
# add dist_mask
valid_combos = np.logical_and(valid_combos, tp_dist_mask)
id_offset_combo = pred_inst_in_cl[
valid_combos] + self.id_offset * gt_inst_in_cl[valid_combos]
unique_combo, counts_combo = np.unique(
id_offset_combo, return_counts=True)
# generate an intersection map
# count the intersections with over 0.5 IoU as TP
gt_labels = unique_combo // self.id_offset
pred_labels = unique_combo % self.id_offset
gt_areas = np.array([counts_gt[id2idx_gt[id]] for id in gt_labels])
pred_areas = np.array(
[counts_pred[id2idx_pred[id]] for id in pred_labels])
intersections = counts_combo
unions = gt_areas + pred_areas - intersections
ious = intersections.astype(float) / unions.astype(float)
tp_indexes = ious > 0.5
self.pan_tp[j][cl] += np.sum(tp_indexes)
self.pan_iou[j][cl] += np.sum(ious[tp_indexes])
matched_gt[[id2idx_gt[id] for id in gt_labels[tp_indexes]]] = True
matched_pred[[id2idx_pred[id]
for id in pred_labels[tp_indexes]]] = True
# count the FN
if len(counts_gt) > 0:
self.pan_fn[j][cl] += np.sum(
np.logical_and(counts_gt >= self.min_num_points,
~matched_gt))
# count the FP
if len(matched_pred) > 0:
self.pan_fp[j][cl] += np.sum(
np.logical_and(counts_pred >= self.min_num_points,
~matched_pred))
def count_pq(self):
sq_all = self.pan_iou.astype(np.double) / np.maximum(
self.pan_tp.astype(np.double), self.eps)
rq_all = self.pan_tp.astype(np.double) / np.maximum(
self.pan_tp.astype(np.double) + 0.5 * self.pan_fp.astype(np.double)
+ 0.5 * self.pan_fn.astype(np.double), self.eps)
pq_all = sq_all * rq_all
# mask classes not occurring in dataset
mask = (self.pan_tp + self.pan_fp + self.pan_fn) > 0
pq_all[~mask] = float('nan')
table = PrettyTable([
'Class Names',
'RayPQ@%d' % self.thresholds[0],
'RayPQ@%d' % self.thresholds[1],
'RayPQ@%d' % self.thresholds[2]
])
table.float_format = '.3'
for i in range(len(self.class_names) - 1):
table.add_row([
self.class_names[i],
pq_all[0][i], pq_all[1][i], pq_all[2][i],
], divider=(i == len(self.class_names) - 2))
table.add_row([
'MEAN',
np.nanmean(pq_all[0]), np.nanmean(pq_all[1]), np.nanmean(pq_all[2])
])
print(table)
return {
'RayPQ': np.nanmean(pq_all),
'RayPQ@1': np.nanmean(pq_all[0]),
'RayPQ@2': np.nanmean(pq_all[1]),
'RayPQ@4': np.nanmean(pq_all[2]),
}
# Copyright (c) OpenMMLab. All rights reserved.
from .ema import MEGVIIEMAHook
from .utils import is_parallel
from .sequentialcontrol import SequentialControlHook
from .syncbncontrol import SyncbnControlHook
__all__ = ['MEGVIIEMAHook', 'SequentialControlHook', 'is_parallel',
'SyncbnControlHook']
# Copyright (c) OpenMMLab. All rights reserved.
# modified from megvii-bevdepth.
import math
import os
from copy import deepcopy
import torch
from mmcv.runner import load_state_dict
from mmcv.runner.dist_utils import master_only
from mmcv.runner.hooks import HOOKS, Hook
from .utils import is_parallel
__all__ = ['ModelEMA']
class ModelEMA:
"""Model Exponential Moving Average from https://github.com/rwightman/
pytorch-image-models Keep a moving average of everything in the model
state_dict (parameters and buffers).
This is intended to allow functionality like
https://www.tensorflow.org/api_docs/python/tf/train/
ExponentialMovingAverage
A smoothed version of the weights is necessary for some training
schemes to perform well.
This class is sensitive where it is initialized in the sequence
of model init, GPU assignment and distributed training wrappers.
"""
def __init__(self, model, decay=0.9999, updates=0):
"""
Args:
model (nn.Module): model to apply EMA.
decay (float): ema decay reate.
updates (int): counter of EMA updates.
"""
# Create EMA(FP32)
self.ema_model = deepcopy(model).eval()
self.ema = self.ema_model.module.module if is_parallel(
self.ema_model.module) else self.ema_model.module
self.updates = updates
# decay exponential ramp (to help early epochs)
self.decay = lambda x: decay * (1 - math.exp(-x / 2000))
for p in self.ema.parameters():
p.requires_grad_(False)
def update(self, trainer, model):
# Update EMA parameters
with torch.no_grad():
self.updates += 1
d = self.decay(self.updates)
msd = model.module.state_dict() if is_parallel(
model) else model.state_dict() # model state_dict
for k, v in self.ema.state_dict().items():
if v.dtype.is_floating_point:
v *= d
v += (1.0 - d) * msd[k].detach()
@HOOKS.register_module()
class MEGVIIEMAHook(Hook):
"""EMAHook used in BEVDepth.
Modified from https://github.com/Megvii-Base
Detection/BEVDepth/blob/main/callbacks/ema.py.
"""
def __init__(self, init_updates=0, decay=0.9990, resume=None):
super().__init__()
self.init_updates = init_updates
self.resume = resume
self.decay = decay
def before_run(self, runner):
from torch.nn.modules.batchnorm import SyncBatchNorm
bn_model_list = list()
bn_model_dist_group_list = list()
for model_ref in runner.model.modules():
if isinstance(model_ref, SyncBatchNorm):
bn_model_list.append(model_ref)
bn_model_dist_group_list.append(model_ref.process_group)
model_ref.process_group = None
runner.ema_model = ModelEMA(runner.model, self.decay)
for bn_model, dist_group in zip(bn_model_list,
bn_model_dist_group_list):
bn_model.process_group = dist_group
runner.ema_model.updates = self.init_updates
if self.resume is not None:
runner.logger.info(f'resume ema checkpoint from {self.resume}')
cpt = torch.load(self.resume, map_location='cpu')
load_state_dict(runner.ema_model.ema, cpt['state_dict'])
runner.ema_model.updates = cpt['updates']
def after_train_iter(self, runner):
runner.ema_model.update(runner, runner.model.module)
def after_train_epoch(self, runner):
# if self.is_last_epoch(runner): # 只保存最后一个epoch的ema权重.
self.save_checkpoint(runner)
@master_only
def save_checkpoint(self, runner):
state_dict = runner.ema_model.ema.state_dict()
ema_checkpoint = {
'epoch': runner.epoch,
'state_dict': state_dict,
'updates': runner.ema_model.updates
}
save_path = f'epoch_{runner.epoch+1}_ema.pth'
save_path = os.path.join(runner.work_dir, save_path)
torch.save(ema_checkpoint, save_path)
runner.logger.info(f'Saving ema checkpoint at {save_path}')
# Copyright (c) OpenMMLab. All rights reserved.
from mmcv.runner.hooks import HOOKS, Hook
from .utils import is_parallel
__all__ = ['SequentialControlHook']
@HOOKS.register_module()
class SequentialControlHook(Hook):
""" """
def __init__(self, temporal_start_epoch=1):
super().__init__()
self.temporal_start_epoch=temporal_start_epoch
def set_temporal_flag(self, runner, flag):
if is_parallel(runner.model.module):
runner.model.module.module.with_prev=flag
else:
runner.model.module.with_prev = flag
def before_run(self, runner):
self.set_temporal_flag(runner, False)
def before_train_epoch(self, runner):
if runner.epoch > self.temporal_start_epoch:
self.set_temporal_flag(runner, True)
\ No newline at end of file
# Copyright (c) OpenMMLab. All rights reserved.
from mmcv.runner.hooks import HOOKS, Hook
from .utils import is_parallel
from torch.nn import SyncBatchNorm
__all__ = ['SyncbnControlHook']
@HOOKS.register_module()
class SyncbnControlHook(Hook):
""" """
def __init__(self, syncbn_start_epoch=1):
super().__init__()
self.is_syncbn=False
self.syncbn_start_epoch = syncbn_start_epoch
def cvt_syncbn(self, runner):
if is_parallel(runner.model.module):
runner.model.module.module=\
SyncBatchNorm.convert_sync_batchnorm(runner.model.module.module,
process_group=None)
else:
runner.model.module=\
SyncBatchNorm.convert_sync_batchnorm(runner.model.module,
process_group=None)
def before_train_epoch(self, runner):
if runner.epoch>= self.syncbn_start_epoch and not self.is_syncbn:
print('start use syncbn')
self.cvt_syncbn(runner)
self.is_syncbn=True
# Copyright (c) OpenMMLab. All rights reserved.
from torch import nn
__all__ = ['is_parallel']
def is_parallel(model):
"""check if model is in parallel mode."""
parallel_type = (
nn.parallel.DataParallel,
nn.parallel.DistributedDataParallel,
)
return isinstance(model, parallel_type)
from .box3d_nms import nms_bev
\ No newline at end of file
# Copyright (c) OpenMMLab. All rights reserved.
import numba
import numpy as np
import torch
from mmcv.ops import nms, nms_rotated
# This function duplicates functionality of mmcv.ops.iou_3d.nms_bev
# from mmcv<=1.5, but using cuda ops from mmcv.ops.nms.nms_rotated.
# Nms api will be unified in mmdetection3d one day.
def nms_bev(boxes, scores, thresh, pre_max_size=None, post_max_size=None,
xyxyr2xywhr=True):
"""NMS function GPU implementation (for BEV boxes). The overlap of two
boxes for IoU calculation is defined as the exact overlapping area of the
two boxes. In this function, one can also set ``pre_max_size`` and
``post_max_size``.
Args:
boxes (torch.Tensor): Input boxes with the shape of [N, 5]
([x1, y1, x2, y2, ry]).
scores (torch.Tensor): Scores of boxes with the shape of [N].
thresh (float): Overlap threshold of NMS.
pre_max_size (int, optional): Max size of boxes before NMS.
Default: None.
post_max_size (int, optional): Max size of boxes after NMS.
Default: None.
Returns:
torch.Tensor: Indexes after NMS.
"""
assert boxes.size(1) == 5, 'Input boxes shape should be [N, 5]'
order = scores.sort(0, descending=True)[1]
if pre_max_size is not None:
order = order[:pre_max_size]
boxes = boxes[order].contiguous()
scores = scores[order]
# xyxyr -> back to xywhr
# note: better skip this step before nms_bev call in the future
if xyxyr2xywhr:
boxes = torch.stack(
((boxes[:, 0] + boxes[:, 2]) / 2, (boxes[:, 1] + boxes[:, 3]) / 2,
boxes[:, 2] - boxes[:, 0], boxes[:, 3] - boxes[:, 1], boxes[:, 4]),
dim=-1)
keep = nms_rotated(boxes, scores, thresh)[1]
keep = order[keep]
if post_max_size is not None:
keep = keep[:post_max_size]
return keep
# This function duplicates functionality of mmcv.ops.iou_3d.nms_normal_bev
# from mmcv<=1.5, but using cuda ops from mmcv.ops.nms.nms.
# Nms api will be unified in mmdetection3d one day.
def nms_normal_bev(boxes, scores, thresh):
"""Normal NMS function GPU implementation (for BEV boxes). The overlap of
two boxes for IoU calculation is defined as the exact overlapping area of
the two boxes WITH their yaw angle set to 0.
Args:
boxes (torch.Tensor): Input boxes with shape (N, 5).
scores (torch.Tensor): Scores of predicted boxes with shape (N).
thresh (float): Overlap threshold of NMS.
Returns:
torch.Tensor: Remaining indices with scores in descending order.
"""
assert boxes.shape[1] == 5, 'Input boxes shape should be [N, 5]'
return nms(boxes[:, :-1], scores, thresh)[1]
from .nuscenes_dataset_bevdet import NuScenesDatasetBEVDet
from .nuscenes_dataset_occ import NuScenesDatasetOccpancy
from .pipelines import *
__all__ = ['NuScenesDatasetBEVDet', 'NuScenesDatasetOccpancy']
\ No newline at end of file
import torch
import numpy as np
from pyquaternion import Quaternion
from torch.utils.data import Dataset
np.set_printoptions(precision=3, suppress=True)
def trans_matrix(T, R):
tm = np.eye(4)
tm[:3, :3] = R.rotation_matrix
tm[:3, 3] = T
return tm
class EgoPoseDataset(Dataset):
def __init__(self, data_infos):
super(EgoPoseDataset, self).__init__()
self.data_infos = data_infos
self.scene_frames = {}
for info in data_infos:
scene_token = self.get_scene_token(info)
if scene_token not in self.scene_frames:
self.scene_frames[scene_token] = []
self.scene_frames[scene_token].append(info)
def __len__(self):
return len(self.data_infos)
def get_scene_token(self, info):
if 'scene_token' in info:
scene_name = info['scene_token']
else:
scene_name = info['occ_path'].split('occupancy/')[-1].split('/')[0]
return scene_name
def get_ego_from_lidar(self, info):
ego_from_lidar = trans_matrix(
np.array(info['lidar2ego_translation']),
Quaternion(info['lidar2ego_rotation']))
return ego_from_lidar
def get_global_pose(self, info, inverse=False):
global_from_ego = trans_matrix(
np.array(info['ego2global_translation']),
Quaternion(info['ego2global_rotation']))
ego_from_lidar = trans_matrix(
np.array(info['lidar2ego_translation']),
Quaternion(info['lidar2ego_rotation']))
pose = global_from_ego.dot(ego_from_lidar)
if inverse:
pose = np.linalg.inv(pose)
return pose
def __getitem__(self, idx):
info = self.data_infos[idx]
ref_sample_token = info['token']
ref_lidar_from_global = self.get_global_pose(info, inverse=True)
ref_ego_from_lidar = self.get_ego_from_lidar(info)
scene_token = self.get_scene_token(info)
scene_frame = self.scene_frames[scene_token]
ref_index = scene_frame.index(info)
# NOTE: getting output frames
output_origin_list = []
for curr_index in range(len(scene_frame)):
# if this exists a valid target
if curr_index == ref_index:
origin_tf = np.array([0.0, 0.0, 0.0], dtype=np.float32)
else:
# transform from the current lidar frame to global and then to the reference lidar frame
global_from_curr = self.get_global_pose(scene_frame[curr_index], inverse=False)
ref_from_curr = ref_lidar_from_global.dot(global_from_curr)
origin_tf = np.array(ref_from_curr[:3, 3], dtype=np.float32)
origin_tf_pad = np.ones([4])
origin_tf_pad[:3] = origin_tf # pad to [4]
origin_tf = np.dot(ref_ego_from_lidar[:3], origin_tf_pad.T).T # [3]
# origin
if np.abs(origin_tf[0]) < 39 and np.abs(origin_tf[1]) < 39:
output_origin_list.append(origin_tf)
# select 8 origins
if len(output_origin_list) > 8:
select_idx = np.round(np.linspace(0, len(output_origin_list) - 1, 8)).astype(np.int64)
output_origin_list = [output_origin_list[i] for i in select_idx]
output_origin_tensor = torch.from_numpy(np.stack(output_origin_list)) # [T, 3]
return (ref_sample_token, output_origin_tensor)
# Copyright (c) OpenMMLab. All rights reserved.
import tempfile
from os import path as osp
import mmcv
import numpy as np
import pyquaternion
from nuscenes.utils.data_classes import Box as NuScenesBox
from mmdet3d.core import show_result
from mmdet3d.core.bbox import Box3DMode, Coord3DMode, LiDARInstance3DBoxes
from mmdet3d.datasets import DATASETS
from mmdet3d.datasets.custom_3d import Custom3DDataset
from mmdet3d.datasets.pipelines import Compose
@DATASETS.register_module()
class NuScenesDatasetBEVDet(Custom3DDataset):
r"""NuScenes Dataset.
This class serves as the API for experiments on the NuScenes Dataset.
Please refer to `NuScenes Dataset <https://www.nuscenes.org/download>`_
for data downloading.
Args:
ann_file (str): Path of annotation file.
pipeline (list[dict], optional): Pipeline used for data processing.
Defaults to None.
data_root (str): Path of dataset root.
classes (tuple[str], optional): Classes used in the dataset.
Defaults to None.
load_interval (int, optional): Interval of loading the dataset. It is
used to uniformly sample the dataset. Defaults to 1.
with_velocity (bool, optional): Whether include velocity prediction
into the experiments. Defaults to True.
modality (dict, optional): Modality to specify the sensor data used
as input. Defaults to None.
box_type_3d (str, optional): Type of 3D box of this dataset.
Based on the `box_type_3d`, the dataset will encapsulate the box
to its original format then converted them to `box_type_3d`.
Defaults to 'LiDAR' in this dataset. Available options includes.
- 'LiDAR': Box in LiDAR coordinates.
- 'Depth': Box in depth coordinates, usually for indoor dataset.
- 'Camera': Box in camera coordinates.
filter_empty_gt (bool, optional): Whether to filter empty GT.
Defaults to True.
test_mode (bool, optional): Whether the dataset is in test mode.
Defaults to False.
eval_version (bool, optional): Configuration version of evaluation.
Defaults to 'detection_cvpr_2019'.
use_valid_flag (bool, optional): Whether to use `use_valid_flag` key
in the info file as mask to filter gt_boxes and gt_names.
Defaults to False.
img_info_prototype (str, optional): Type of img information.
Based on 'img_info_prototype', the dataset will prepare the image
data info in the type of 'mmcv' for official image infos,
'bevdet' for BEVDet, and 'bevdet4d' for BEVDet4D.
Defaults to 'mmcv'.
multi_adj_frame_id_cfg (tuple[int]): Define the selected index of
reference adjcacent frames.
ego_cam (str): Specify the ego coordinate relative to a specified
camera by its name defined in NuScenes.
Defaults to None, which use the mean of all cameras.
"""
NameMapping = {
'movable_object.barrier': 'barrier',
'vehicle.bicycle': 'bicycle',
'vehicle.bus.bendy': 'bus',
'vehicle.bus.rigid': 'bus',
'vehicle.car': 'car',
'vehicle.construction': 'construction_vehicle',
'vehicle.motorcycle': 'motorcycle',
'human.pedestrian.adult': 'pedestrian',
'human.pedestrian.child': 'pedestrian',
'human.pedestrian.construction_worker': 'pedestrian',
'human.pedestrian.police_officer': 'pedestrian',
'movable_object.trafficcone': 'traffic_cone',
'vehicle.trailer': 'trailer',
'vehicle.truck': 'truck'
}
DefaultAttribute = {
'car': 'vehicle.parked',
'pedestrian': 'pedestrian.moving',
'trailer': 'vehicle.parked',
'truck': 'vehicle.parked',
'bus': 'vehicle.moving',
'motorcycle': 'cycle.without_rider',
'construction_vehicle': 'vehicle.parked',
'bicycle': 'cycle.without_rider',
'barrier': '',
'traffic_cone': '',
}
AttrMapping = {
'cycle.with_rider': 0,
'cycle.without_rider': 1,
'pedestrian.moving': 2,
'pedestrian.standing': 3,
'pedestrian.sitting_lying_down': 4,
'vehicle.moving': 5,
'vehicle.parked': 6,
'vehicle.stopped': 7,
}
AttrMapping_rev = [
'cycle.with_rider',
'cycle.without_rider',
'pedestrian.moving',
'pedestrian.standing',
'pedestrian.sitting_lying_down',
'vehicle.moving',
'vehicle.parked',
'vehicle.stopped',
]
# https://github.com/nutonomy/nuscenes-devkit/blob/57889ff20678577025326cfc24e57424a829be0a/python-sdk/nuscenes/eval/detection/evaluate.py#L222 # noqa
ErrNameMapping = {
'trans_err': 'mATE',
'scale_err': 'mASE',
'orient_err': 'mAOE',
'vel_err': 'mAVE',
'attr_err': 'mAAE'
}
CLASSES = ('car', 'truck', 'trailer', 'bus', 'construction_vehicle',
'bicycle', 'motorcycle', 'pedestrian', 'traffic_cone',
'barrier')
def __init__(self,
ann_file,
pipeline=None,
data_root=None,
classes=None,
load_interval=1,
with_velocity=True,
modality=None,
box_type_3d='LiDAR',
filter_empty_gt=True,
test_mode=False,
eval_version='detection_cvpr_2019',
use_valid_flag=False,
img_info_prototype='mmcv',
multi_adj_frame_id_cfg=None,
ego_cam='CAM_FRONT',
stereo=False):
self.load_interval = load_interval
self.use_valid_flag = use_valid_flag
super().__init__(
data_root=data_root,
ann_file=ann_file,
pipeline=pipeline,
classes=classes,
modality=modality,
box_type_3d=box_type_3d,
filter_empty_gt=filter_empty_gt,
test_mode=test_mode)
self.with_velocity = with_velocity
self.eval_version = eval_version
from nuscenes.eval.detection.config import config_factory
self.eval_detection_configs = config_factory(self.eval_version)
if self.modality is None:
self.modality = dict(
use_camera=False,
use_lidar=True,
use_radar=False,
use_map=False,
use_external=False,
)
self.img_info_prototype = img_info_prototype
self.multi_adj_frame_id_cfg = multi_adj_frame_id_cfg
self.ego_cam = ego_cam
self.stereo = stereo
def get_cat_ids(self, idx):
"""Get category distribution of single scene.
Args:
idx (int): Index of the data_info.
Returns:
dict[list]: for each category, if the current scene
contains such boxes, store a list containing idx,
otherwise, store empty list.
"""
info = self.data_infos[idx]
if self.use_valid_flag:
mask = info['valid_flag']
gt_names = set(info['gt_names'][mask])
else:
gt_names = set(info['gt_names'])
cat_ids = []
for name in gt_names:
if name in self.CLASSES:
cat_ids.append(self.cat2id[name])
return cat_ids
def load_annotations(self, ann_file):
"""Load annotations from ann_file.
Args:
ann_file (str): Path of the annotation file.
Returns:
list[dict]: List of annotations sorted by timestamps.
"""
data = mmcv.load(ann_file, file_format='pkl')
data_infos = list(sorted(data['infos'], key=lambda e: e['timestamp']))
data_infos = data_infos[::self.load_interval]
self.metadata = data['metadata']
self.version = self.metadata['version']
return data_infos
def get_data_info(self, index):
"""Get data info according to the given index.
Args:
index (int): Index of the sample data to get.
Returns:
dict: Data information that will be passed to the data
preprocessing pipelines. It includes the following keys:
- sample_idx (str): Sample index.
- pts_filename (str): Filename of point clouds.
- sweeps (list[dict]): Infos of sweeps.
- timestamp (float): Sample timestamp.
- img_filename (str, optional): Image filename.
- lidar2img (list[np.ndarray], optional): Transformations
from lidar to different cameras.
- ann_info (dict): Annotation info.
"""
info = self.data_infos[index]
# standard protocol modified from SECOND.Pytorch
input_dict = dict(
sample_idx=info['token'],
pts_filename=info['lidar_path'],
sweeps=info['sweeps'],
timestamp=info['timestamp'] / 1e6,
)
if 'ann_infos' in info:
input_dict['ann_infos'] = info['ann_infos']
if self.modality['use_camera']:
if self.img_info_prototype == 'mmcv':
image_paths = []
lidar2img_rts = []
for cam_type, cam_info in info['cams'].items():
image_paths.append(cam_info['data_path'])
# obtain lidar to image transformation matrix
lidar2cam_r = np.linalg.inv(
cam_info['sensor2lidar_rotation'])
lidar2cam_t = cam_info[
'sensor2lidar_translation'] @ lidar2cam_r.T
lidar2cam_rt = np.eye(4)
lidar2cam_rt[:3, :3] = lidar2cam_r.T
lidar2cam_rt[3, :3] = -lidar2cam_t
intrinsic = cam_info['cam_intrinsic']
viewpad = np.eye(4)
viewpad[:intrinsic.shape[0], :intrinsic.
shape[1]] = intrinsic
lidar2img_rt = (viewpad @ lidar2cam_rt.T)
lidar2img_rts.append(lidar2img_rt)
input_dict.update(
dict(
img_filename=image_paths,
lidar2img=lidar2img_rts,
))
if not self.test_mode:
annos = self.get_ann_info(index)
input_dict['ann_info'] = annos
else:
assert 'bevdet' in self.img_info_prototype
input_dict.update(dict(curr=info))
if '4d' in self.img_info_prototype: # 需要再读取历史帧的信息
info_adj_list = self.get_adj_info(info, index)
input_dict.update(dict(adjacent=info_adj_list))
return input_dict
def get_adj_info(self, info, index):
info_adj_list = []
adj_id_list = list(range(*self.multi_adj_frame_id_cfg)) # bevdet4d: [1, ] 只利用前一帧.
if self.stereo:
assert self.multi_adj_frame_id_cfg[0] == 1
assert self.multi_adj_frame_id_cfg[2] == 1
# 如果使用stereo4d, 不仅当前帧需要利用前一帧图像计算stereo depth, 前一帧也需要利用它的前一帧计算stereo depth.
# 因此, 我们需要额外读取一帧(也就是前一帧的前一帧).
adj_id_list.append(self.multi_adj_frame_id_cfg[1])
for select_id in adj_id_list:
select_id = max(index - select_id, 0)
if not self.data_infos[select_id]['scene_token'] == info[
'scene_token']:
info_adj_list.append(info)
else:
info_adj_list.append(self.data_infos[select_id])
return info_adj_list
def get_ann_info(self, index):
"""Get annotation info according to the given index.
Args:
index (int): Index of the annotation data to get.
Returns:
dict: Annotation information consists of the following keys:
- gt_bboxes_3d (:obj:`LiDARInstance3DBoxes`):
3D ground truth bboxes
- gt_labels_3d (np.ndarray): Labels of ground truths.
- gt_names (list[str]): Class names of ground truths.
"""
info = self.data_infos[index]
# filter out bbox containing no points
if self.use_valid_flag:
mask = info['valid_flag']
else:
mask = info['num_lidar_pts'] > 0
gt_bboxes_3d = info['gt_boxes'][mask]
gt_names_3d = info['gt_names'][mask]
gt_labels_3d = []
for cat in gt_names_3d:
if cat in self.CLASSES:
gt_labels_3d.append(self.CLASSES.index(cat))
else:
gt_labels_3d.append(-1)
gt_labels_3d = np.array(gt_labels_3d)
if self.with_velocity:
gt_velocity = info['gt_velocity'][mask]
nan_mask = np.isnan(gt_velocity[:, 0])
gt_velocity[nan_mask] = [0.0, 0.0]
gt_bboxes_3d = np.concatenate([gt_bboxes_3d, gt_velocity], axis=-1)
# the nuscenes box center is [0.5, 0.5, 0.5], we change it to be
# the same as KITTI (0.5, 0.5, 0)
gt_bboxes_3d = LiDARInstance3DBoxes(
gt_bboxes_3d,
box_dim=gt_bboxes_3d.shape[-1],
origin=(0.5, 0.5, 0.5)).convert_to(self.box_mode_3d)
anns_results = dict(
gt_bboxes_3d=gt_bboxes_3d,
gt_labels_3d=gt_labels_3d,
gt_names=gt_names_3d)
return anns_results
def _format_bbox(self, results, jsonfile_prefix=None):
"""Convert the results to the standard format.
Args:
results (list[dict]): Testing results of the dataset.
jsonfile_prefix (str): The prefix of the output jsonfile.
You can specify the output directory/filename by
modifying the jsonfile_prefix. Default: None.
Returns:
str: Path of the output json file.
"""
nusc_annos = {}
mapped_class_names = self.CLASSES
print('Start to convert detection format...')
for sample_id, det in enumerate(mmcv.track_iter_progress(results)):
boxes = det['boxes_3d'].tensor.numpy()
scores = det['scores_3d'].numpy()
labels = det['labels_3d'].numpy()
sample_token = self.data_infos[sample_id]['token']
trans = self.data_infos[sample_id]['cams'][
self.ego_cam]['ego2global_translation']
rot = self.data_infos[sample_id]['cams'][
self.ego_cam]['ego2global_rotation']
rot = pyquaternion.Quaternion(rot)
annos = list()
for i, box in enumerate(boxes):
name = mapped_class_names[labels[i]]
center = box[:3]
wlh = box[[4, 3, 5]]
box_yaw = box[6]
box_vel = box[7:].tolist()
box_vel.append(0)
quat = pyquaternion.Quaternion(axis=[0, 0, 1], radians=box_yaw)
nusc_box = NuScenesBox(center, wlh, quat, velocity=box_vel)
nusc_box.rotate(rot)
nusc_box.translate(trans)
if np.sqrt(nusc_box.velocity[0]**2 +
nusc_box.velocity[1]**2) > 0.2:
if name in [
'car',
'construction_vehicle',
'bus',
'truck',
'trailer',
]:
attr = 'vehicle.moving'
elif name in ['bicycle', 'motorcycle']:
attr = 'cycle.with_rider'
else:
attr = self.DefaultAttribute[name]
else:
if name in ['pedestrian']:
attr = 'pedestrian.standing'
elif name in ['bus']:
attr = 'vehicle.stopped'
else:
attr = self.DefaultAttribute[name]
nusc_anno = dict(
sample_token=sample_token,
translation=nusc_box.center.tolist(),
size=nusc_box.wlh.tolist(),
rotation=nusc_box.orientation.elements.tolist(),
velocity=nusc_box.velocity[:2],
detection_name=name,
detection_score=float(scores[i]),
attribute_name=attr,
)
annos.append(nusc_anno)
# other views results of the same frame should be concatenated
if sample_token in nusc_annos:
nusc_annos[sample_token].extend(annos)
else:
nusc_annos[sample_token] = annos
nusc_submissions = {
'meta': self.modality,
'results': nusc_annos,
}
mmcv.mkdir_or_exist(jsonfile_prefix)
res_path = osp.join(jsonfile_prefix, 'results_nusc.json')
print('Results writes to', res_path)
mmcv.dump(nusc_submissions, res_path)
return res_path
def _evaluate_single(self,
result_path,
logger=None,
metric='bbox',
result_name='pts_bbox'):
"""Evaluation for a single model in nuScenes protocol.
Args:
result_path (str): Path of the result file.
logger (logging.Logger | str, optional): Logger used for printing
related information during evaluation. Default: None.
metric (str, optional): Metric name used for evaluation.
Default: 'bbox'.
result_name (str, optional): Result name in the metric prefix.
Default: 'pts_bbox'.
Returns:
dict: Dictionary of evaluation details.
"""
from nuscenes import NuScenes
from nuscenes.eval.detection.evaluate import NuScenesEval
output_dir = osp.join(*osp.split(result_path)[:-1])
nusc = NuScenes(
version=self.version, dataroot=self.data_root, verbose=False)
eval_set_map = {
'v1.0-mini': 'mini_val',
'v1.0-trainval': 'val',
}
nusc_eval = NuScenesEval(
nusc,
config=self.eval_detection_configs,
result_path=result_path,
eval_set=eval_set_map[self.version],
output_dir=output_dir,
verbose=False)
nusc_eval.main(render_curves=False)
# record metrics
metrics = mmcv.load(osp.join(output_dir, 'metrics_summary.json'))
detail = dict()
metric_prefix = f'{result_name}_NuScenes'
for name in self.CLASSES:
for k, v in metrics['label_aps'][name].items():
val = float('{:.4f}'.format(v))
detail['{}/{}_AP_dist_{}'.format(metric_prefix, name, k)] = val
for k, v in metrics['label_tp_errors'][name].items():
val = float('{:.4f}'.format(v))
detail['{}/{}_{}'.format(metric_prefix, name, k)] = val
for k, v in metrics['tp_errors'].items():
val = float('{:.4f}'.format(v))
detail['{}/{}'.format(metric_prefix,
self.ErrNameMapping[k])] = val
detail['{}/NDS'.format(metric_prefix)] = metrics['nd_score']
detail['{}/mAP'.format(metric_prefix)] = metrics['mean_ap']
return detail
def format_results(self, results, jsonfile_prefix=None):
"""Format the results to json (standard format for COCO evaluation).
Args:
results (list[dict]): Testing results of the dataset.
jsonfile_prefix (str): The prefix of json files. It includes
the file path and the prefix of filename, e.g., "a/b/prefix".
If not specified, a temp file will be created. Default: None.
Returns:
tuple: Returns (result_files, tmp_dir), where `result_files` is a
dict containing the json filepaths, `tmp_dir` is the temporal
directory created for saving json files when
`jsonfile_prefix` is not specified.
"""
assert isinstance(results, list), 'results must be a list'
assert len(results) == len(self), (
'The length of results is not equal to the dataset len: {} != {}'.
format(len(results), len(self)))
if jsonfile_prefix is None:
tmp_dir = tempfile.TemporaryDirectory()
jsonfile_prefix = osp.join(tmp_dir.name, 'results')
else:
tmp_dir = None
# currently the output prediction results could be in two formats
# 1. list of dict('boxes_3d': ..., 'scores_3d': ..., 'labels_3d': ...)
# 2. list of dict('pts_bbox' or 'img_bbox':
# dict('boxes_3d': ..., 'scores_3d': ..., 'labels_3d': ...))
# this is a workaround to enable evaluation of both formats on nuScenes
# refer to https://github.com/open-mmlab/mmdetection3d/issues/449
if not ('pts_bbox' in results[0] or 'img_bbox' in results[0]):
result_files = self._format_bbox(results, jsonfile_prefix)
else:
# should take the inner dict out of 'pts_bbox' or 'img_bbox' dict
result_files = dict()
for name in results[0]:
print(f'\nFormating bboxes of {name}')
results_ = [out[name] for out in results]
# List[dict0, dict1, ...]
# dict: {
# 'boxes_3d': (N, 9)
# 'scores_3d': (N, )
# 'labels_3d': (N, )
# }
tmp_file_ = osp.join(jsonfile_prefix, name)
result_files.update(
{name: self._format_bbox(results_, tmp_file_)})
return result_files, tmp_dir
def evaluate(self,
results,
metric='bbox',
logger=None,
jsonfile_prefix=None,
result_names=['pts_bbox'],
show=False,
out_dir=None,
pipeline=None):
"""Evaluation in nuScenes protocol.
Args:
results (list[dict]): Testing results of the dataset.
metric (str | list[str], optional): Metrics to be evaluated.
Default: 'bbox'.
logger (logging.Logger | str, optional): Logger used for printing
related information during evaluation. Default: None.
jsonfile_prefix (str, optional): The prefix of json files including
the file path and the prefix of filename, e.g., "a/b/prefix".
If not specified, a temp file will be created. Default: None.
show (bool, optional): Whether to visualize.
Default: False.
out_dir (str, optional): Path to save the visualization results.
Default: None.
pipeline (list[dict], optional): raw data loading for showing.
Default: None.
Returns:
dict[str, float]: Results of each evaluation metric.
"""
result_files, tmp_dir = self.format_results(results, jsonfile_prefix)
if isinstance(result_files, dict):
results_dict = dict()
for name in result_names:
print('Evaluating bboxes of {}'.format(name))
ret_dict = self._evaluate_single(result_files[name])
results_dict.update(ret_dict)
elif isinstance(result_files, str):
results_dict = self._evaluate_single(result_files)
if tmp_dir is not None:
tmp_dir.cleanup()
if show or out_dir:
self.show(results, out_dir, show=show, pipeline=pipeline)
return results_dict
def _build_default_pipeline(self):
"""Build the default pipeline for this dataset."""
pipeline = [
dict(
type='LoadPointsFromFile',
coord_type='LIDAR',
load_dim=5,
use_dim=5,
file_client_args=dict(backend='disk')),
dict(
type='LoadPointsFromMultiSweeps',
sweeps_num=10,
file_client_args=dict(backend='disk')),
dict(
type='DefaultFormatBundle3D',
class_names=self.CLASSES,
with_label=False),
dict(type='Collect3D', keys=['points'])
]
return Compose(pipeline)
def show(self, results, out_dir, show=False, pipeline=None):
"""Results visualization.
Args:
results (list[dict]): List of bounding boxes results.
out_dir (str): Output directory of visualization result.
show (bool): Whether to visualize the results online.
Default: False.
pipeline (list[dict], optional): raw data loading for showing.
Default: None.
"""
assert out_dir is not None, 'Expect out_dir, got none.'
pipeline = self._get_pipeline(pipeline)
for i, result in enumerate(results):
if 'pts_bbox' in result.keys():
result = result['pts_bbox']
data_info = self.data_infos[i]
pts_path = data_info['lidar_path']
file_name = osp.split(pts_path)[-1].split('.')[0]
points = self._extract_data(i, pipeline, 'points').numpy()
# for now we convert points into depth mode
points = Coord3DMode.convert_point(points, Coord3DMode.LIDAR,
Coord3DMode.DEPTH)
inds = result['scores_3d'] > 0.1
gt_bboxes = self.get_ann_info(i)['gt_bboxes_3d'].tensor.numpy()
show_gt_bboxes = Box3DMode.convert(gt_bboxes, Box3DMode.LIDAR,
Box3DMode.DEPTH)
pred_bboxes = result['boxes_3d'][inds].tensor.numpy()
show_pred_bboxes = Box3DMode.convert(pred_bboxes, Box3DMode.LIDAR,
Box3DMode.DEPTH)
show_result(points, show_gt_bboxes, show_pred_bboxes, out_dir,
file_name, show)
def output_to_nusc_box(detection, with_velocity=True):
"""Convert the output to the box class in the nuScenes.
Args:
detection (dict): Detection results.
- boxes_3d (:obj:`BaseInstance3DBoxes`): Detection bbox.
- scores_3d (torch.Tensor): Detection scores.
- labels_3d (torch.Tensor): Predicted box labels.
Returns:
list[:obj:`NuScenesBox`]: List of standard NuScenesBoxes.
"""
box3d = detection['boxes_3d']
scores = detection['scores_3d'].numpy()
labels = detection['labels_3d'].numpy()
box_gravity_center = box3d.gravity_center.numpy()
box_dims = box3d.dims.numpy()
box_yaw = box3d.yaw.numpy()
# our LiDAR coordinate system -> nuScenes box coordinate system
nus_box_dims = box_dims[:, [1, 0, 2]]
box_list = []
for i in range(len(box3d)):
quat = pyquaternion.Quaternion(axis=[0, 0, 1], radians=box_yaw[i])
if with_velocity:
velocity = (*box3d.tensor[i, 7:9], 0.0)
else:
velocity = (0, 0, 0)
# velo_val = np.linalg.norm(box3d[i, 7:9])
# velo_ori = box3d[i, 6]
# velocity = (
# velo_val * np.cos(velo_ori), velo_val * np.sin(velo_ori), 0.0)
box = NuScenesBox(
box_gravity_center[i],
nus_box_dims[i],
quat,
label=labels[i],
score=scores[i],
velocity=velocity)
box_list.append(box)
return box_list
def lidar_nusc_box_to_global(info,
boxes,
classes,
eval_configs,
eval_version='detection_cvpr_2019'):
"""Convert the box from ego to global coordinate.
Args:
info (dict): Info for a specific sample data, including the
calibration information.
boxes (list[:obj:`NuScenesBox`]): List of predicted NuScenesBoxes.
classes (list[str]): Mapped classes in the evaluation.
eval_configs (object): Evaluation configuration object.
eval_version (str, optional): Evaluation version.
Default: 'detection_cvpr_2019'
Returns:
list: List of standard NuScenesBoxes in the global
coordinate.
"""
box_list = []
for box in boxes:
# Move box to ego vehicle coord system
box.rotate(pyquaternion.Quaternion(info['lidar2ego_rotation']))
box.translate(np.array(info['lidar2ego_translation']))
# filter det in ego.
cls_range_map = eval_configs.class_range
radius = np.linalg.norm(box.center[:2], 2)
det_range = cls_range_map[classes[box.label]]
if radius > det_range:
continue
# Move box to global coord system
box.rotate(pyquaternion.Quaternion(info['ego2global_rotation']))
box.translate(np.array(info['ego2global_translation']))
box_list.append(box)
return box_list
# Copyright (c) OpenMMLab. All rights reserved.
import os
import mmcv
import torch
import cv2
import numpy as np
from tqdm import tqdm
from mmdet3d.datasets import DATASETS
from .nuscenes_dataset_bevdet import NuScenesDatasetBEVDet as NuScenesDataset
from ..core.evaluation.occ_metrics import Metric_mIoU, Metric_FScore
from .ego_pose_dataset import EgoPoseDataset
from ..core.evaluation.ray_metrics import main as calc_rayiou
from torch.utils.data import DataLoader
from ..core.evaluation.ray_metrics import main_raypq
import torch
import glob
colors_map = np.array(
[
[0, 0, 0, 255], # 0 undefined
[255, 158, 0, 255], # 1 car orange
[0, 0, 230, 255], # 2 pedestrian Blue
[47, 79, 79, 255], # 3 sign Darkslategrey
[220, 20, 60, 255], # 4 CYCLIST Crimson
[255, 69, 0, 255], # 5 traiffic_light Orangered
[255, 140, 0, 255], # 6 pole Darkorange
[233, 150, 70, 255], # 7 construction_cone Darksalmon
[255, 61, 99, 255], # 8 bycycle Red
[112, 128, 144, 255],# 9 motorcycle Slategrey
[222, 184, 135, 255],# 10 building Burlywood
[0, 175, 0, 255], # 11 vegetation Green
[165, 42, 42, 255], # 12 trunk nuTonomy green
[0, 207, 191, 255], # 13 curb, road, lane_marker, other_ground
[75, 0, 75, 255], # 14 walkable, sidewalk
[255, 0, 0, 255], # 15 unobsrvd
[0, 0, 0, 0], # 16 undefined
[0, 0, 0, 0], # 16 undefined
])
@DATASETS.register_module()
class NuScenesDatasetOccpancy(NuScenesDataset):
def get_data_info(self, index):
"""Get data info according to the given index.
Args:
index (int): Index of the sample data to get.
Returns:
dict: Data information that will be passed to the data
preprocessing pipelines. It includes the following keys:
- sample_idx (str): Sample index.
- pts_filename (str): Filename of point clouds.
- sweeps (list[dict]): Infos of sweeps.
- timestamp (float): Sample timestamp.
- img_filename (str, optional): Image filename.
- lidar2img (list[np.ndarray], optional): Transformations
from lidar to different cameras.
- ann_info (dict): Annotation info.
"""
input_dict = super(NuScenesDatasetOccpancy, self).get_data_info(index)
# standard protocol modified from SECOND.Pytorch
# input_dict['occ_gt_path'] = os.path.join(self.data_root, self.data_infos[index]['occ_path'])
input_dict['occ_gt_path'] = self.data_infos[index]['occ_path']
return input_dict
def evaluate(self, occ_results, runner=None, show_dir=None, **eval_kwargs):
metric = eval_kwargs['metric'][0]
print("metric = ", metric)
if metric == 'ray-iou':
occ_gts = []
occ_preds = []
lidar_origins = []
inst_gts = []
inst_preds = []
print('\nStarting Evaluation...')
data_loader = DataLoader(
EgoPoseDataset(self.data_infos),
batch_size=1,
shuffle=False,
num_workers=8
)
sample_tokens = [info['token'] for info in self.data_infos]
for i, batch in enumerate(data_loader):
# if i > 5:
# break
token = batch[0][0]
output_origin = batch[1]
data_id = sample_tokens.index(token)
info = self.data_infos[data_id]
# occ_gt = np.load(os.path.join(self.data_root, info['occ_path'], 'labels.npz'))
# occ_gt = np.load(os.path.join(info['occ_path'], 'labels.npz'))
occ_gt = np.load(os.path.join(info['occ_path'].replace('data/nuscenes/gts/', 'data/nuscenes/occ3d_panoptic/'), 'labels.npz'))
gt_semantics = occ_gt['semantics'] # (Dx, Dy, Dz)
mask_lidar = occ_gt['mask_lidar'].astype(bool) # (Dx, Dy, Dz)
mask_camera = occ_gt['mask_camera'].astype(bool) # (Dx, Dy, Dz)
occ_pred = occ_results[data_id]['pred_occ'].cpu().numpy() # (Dx, Dy, Dz)
# occ_pred = occ_results[data_id]['pred_occ'] # (Dx, Dy, Dz)
lidar_origins.append(output_origin)
occ_gts.append(gt_semantics)
occ_preds.append(occ_pred)
if 'pano_inst' in occ_results[data_id].keys():
pano_inst = occ_results[data_id]['pano_inst'].cpu()
# pano_inst = torch.from_numpy(occ_results[data_id]['pano_inst'])
pano_inst = pano_inst.squeeze(0).numpy()
gt_instances = occ_gt['instances']
inst_gts.append(gt_instances)
inst_preds.append(pano_inst)
eval_results = calc_rayiou(occ_preds, occ_gts, lidar_origins)
if len(inst_preds) > 0:
eval_results.update(main_raypq(occ_preds, occ_gts, inst_preds, inst_gts, lidar_origins))
# eval_results = main_raypq(occ_preds, occ_gts, inst_preds, inst_gts, lidar_origins)
else:
self.occ_eval_metrics = Metric_mIoU(
num_classes=18,
use_lidar_mask=False,
use_image_mask=True)
print('\nStarting Evaluation...')
for index, occ_pred in enumerate(tqdm(occ_results)):
# occ_pred: (Dx, Dy, Dz)
info = self.data_infos[index]
# occ_gt = np.load(os.path.join(self.data_root, info['occ_path'], 'labels.npz'))
occ_gt = np.load(os.path.join(info['occ_path'], 'labels.npz'))
gt_semantics = occ_gt['semantics'] # (Dx, Dy, Dz)
mask_lidar = occ_gt['mask_lidar'].astype(bool) # (Dx, Dy, Dz)
mask_camera = occ_gt['mask_camera'].astype(bool) # (Dx, Dy, Dz)
# occ_pred = occ_pred
self.occ_eval_metrics.add_batch(
occ_pred['pred_occ'] if (isinstance(occ_pred, dict) and 'pred_occ' in occ_pred) else occ_pred, # (Dx, Dy, Dz)
gt_semantics, # (Dx, Dy, Dz)
mask_lidar, # (Dx, Dy, Dz)
mask_camera # (Dx, Dy, Dz)
)
# if index % 100 == 0 and show_dir is not None:
# gt_vis = self.vis_occ(gt_semantics)
# pred_vis = self.vis_occ(occ_pred)
# mmcv.imwrite(np.concatenate([gt_vis, pred_vis], axis=1),
# os.path.join(show_dir + "%d.jpg"%index))
if show_dir is not None:
mmcv.mkdir_or_exist(show_dir)
# scene_name = info['scene_name']
scene_name = [tem for tem in info['occ_path'].split('/') if 'scene-' in tem][0]
sample_token = info['token']
mmcv.mkdir_or_exist(os.path.join(show_dir, scene_name, sample_token))
save_path = os.path.join(show_dir, scene_name, sample_token, 'pred.npz')
np.savez_compressed(save_path, pred=occ_pred['pred_occ'] if (isinstance(occ_pred, dict) and 'pred_occ' in occ_pred) else occ_pred, gt=occ_gt, sample_token=sample_token)
eval_results = self.occ_eval_metrics.count_miou()
return eval_results
def vis_occ(self, semantics):
# simple visualization of result in BEV
semantics_valid = np.logical_not(semantics == 17)
d = np.arange(16).reshape(1, 1, 16)
d = np.repeat(d, 200, axis=0)
d = np.repeat(d, 200, axis=1).astype(np.float32)
d = d * semantics_valid
selected = np.argmax(d, axis=2)
selected_torch = torch.from_numpy(selected)
semantics_torch = torch.from_numpy(semantics)
occ_bev_torch = torch.gather(semantics_torch, dim=2,
index=selected_torch.unsqueeze(-1))
occ_bev = occ_bev_torch.numpy()
occ_bev = occ_bev.flatten().astype(np.int32)
occ_bev_vis = colors_map[occ_bev].astype(np.uint8)
occ_bev_vis = occ_bev_vis.reshape(200, 200, 4)[::-1, ::-1, :3]
occ_bev_vis = cv2.resize(occ_bev_vis,(400,400))
return occ_bev_vis
from .loading import PrepareImageInputs, LoadAnnotationsBEVDepth, PointToMultiViewDepth
from mmdet3d.datasets.pipelines import LoadPointsFromFile
from mmdet3d.datasets.pipelines import ObjectRangeFilter, ObjectNameFilter
from .formating import DefaultFormatBundle3D, Collect3D
__all__ = ['PrepareImageInputs', 'LoadAnnotationsBEVDepth', 'ObjectRangeFilter', 'ObjectNameFilter',
'PointToMultiViewDepth', 'DefaultFormatBundle3D', 'Collect3D']
# Copyright (c) OpenMMLab. All rights reserved.
import numpy as np
from mmcv.parallel import DataContainer as DC
from mmdet3d.core.bbox import BaseInstance3DBoxes
from mmdet3d.core.points import BasePoints
from mmdet.datasets.pipelines import to_tensor
from mmdet3d.datasets.builder import PIPELINES
@PIPELINES.register_module(force=True)
class DefaultFormatBundle(object):
"""Default formatting bundle.
It simplifies the pipeline of formatting common fields, including "img",
"proposals", "gt_bboxes", "gt_labels", "gt_masks" and "gt_semantic_seg".
These fields are formatted as follows.
- img: (1)transpose, (2)to tensor, (3)to DataContainer (stack=True)
- proposals: (1)to tensor, (2)to DataContainer
- gt_bboxes: (1)to tensor, (2)to DataContainer
- gt_bboxes_ignore: (1)to tensor, (2)to DataContainer
- gt_labels: (1)to tensor, (2)to DataContainer
- gt_masks: (1)to tensor, (2)to DataContainer (cpu_only=True)
- gt_semantic_seg: (1)unsqueeze dim-0 (2)to tensor,
(3)to DataContainer (stack=True)
"""
def __init__(self, ):
return
def __call__(self, results):
"""Call function to transform and format common fields in results.
Args:
results (dict): Result dict contains the data to convert.
Returns:
dict: The result dict contains the data that is formatted with
default bundle.
"""
if 'img' in results:
if isinstance(results['img'], list):
# process multiple imgs in single frame
imgs = [img.transpose(2, 0, 1) for img in results['img']]
imgs = np.ascontiguousarray(np.stack(imgs, axis=0))
results['img'] = DC(to_tensor(imgs), stack=True)
else:
img = np.ascontiguousarray(results['img'].transpose(2, 0, 1))
results['img'] = DC(to_tensor(img), stack=True)
for key in [
'proposals', 'gt_bboxes', 'gt_bboxes_ignore', 'gt_labels',
'gt_labels_3d', 'attr_labels', 'pts_instance_mask',
'pts_semantic_mask', 'centers2d', 'depths'
]:
if key not in results:
continue
if isinstance(results[key], list):
results[key] = DC([to_tensor(res) for res in results[key]])
else:
results[key] = DC(to_tensor(results[key]))
if 'gt_bboxes_3d' in results:
if isinstance(results['gt_bboxes_3d'], BaseInstance3DBoxes):
results['gt_bboxes_3d'] = DC(
results['gt_bboxes_3d'], cpu_only=True)
else:
results['gt_bboxes_3d'] = DC(
to_tensor(results['gt_bboxes_3d']))
if 'gt_masks' in results:
results['gt_masks'] = DC(results['gt_masks'], cpu_only=True)
if 'gt_semantic_seg' in results:
results['gt_semantic_seg'] = DC(
to_tensor(results['gt_semantic_seg'][None, ...]), stack=True)
return results
def __repr__(self):
return self.__class__.__name__
@PIPELINES.register_module(force=True)
class Collect3D(object):
"""Collect data from the loader relevant to the specific task.
This is usually the last stage of the data loader pipeline. Typically keys
is set to some subset of "img", "proposals", "gt_bboxes",
"gt_bboxes_ignore", "gt_labels", and/or "gt_masks".
The "img_meta" item is always populated. The contents of the "img_meta"
dictionary depends on "meta_keys". By default this includes:
- 'img_shape': shape of the image input to the network as a tuple
(h, w, c). Note that images may be zero padded on the
bottom/right if the batch tensor is larger than this shape.
- 'scale_factor': a float indicating the preprocessing scale
- 'flip': a boolean indicating if image flip transform was used
- 'filename': path to the image file
- 'ori_shape': original shape of the image as a tuple (h, w, c)
- 'pad_shape': image shape after padding
- 'lidar2img': transform from lidar to image
- 'depth2img': transform from depth to image
- 'cam2img': transform from camera to image
- 'pcd_horizontal_flip': a boolean indicating if point cloud is
flipped horizontally
- 'pcd_vertical_flip': a boolean indicating if point cloud is
flipped vertically
- 'box_mode_3d': 3D box mode
- 'box_type_3d': 3D box type
- 'img_norm_cfg': a dict of normalization information:
- mean: per channel mean subtraction
- std: per channel std divisor
- to_rgb: bool indicating if bgr was converted to rgb
- 'pcd_trans': point cloud transformations
- 'sample_idx': sample index
- 'pcd_scale_factor': point cloud scale factor
- 'pcd_rotation': rotation applied to point cloud
- 'pts_filename': path to point cloud file.
Args:
keys (Sequence[str]): Keys of results to be collected in ``data``.
meta_keys (Sequence[str], optional): Meta keys to be converted to
``mmcv.DataContainer`` and collected in ``data[img_metas]``.
Default: ('filename', 'ori_shape', 'img_shape', 'lidar2img',
'depth2img', 'cam2img', 'pad_shape', 'scale_factor', 'flip',
'pcd_horizontal_flip', 'pcd_vertical_flip', 'box_mode_3d',
'box_type_3d', 'img_norm_cfg', 'pcd_trans',
'sample_idx', 'pcd_scale_factor', 'pcd_rotation', 'pts_filename')
"""
def __init__(
self,
keys,
meta_keys=('filename', 'ori_shape', 'img_shape', 'lidar2img',
'depth2img', 'cam2img', 'pad_shape', 'scale_factor', 'flip',
'pcd_horizontal_flip', 'pcd_vertical_flip', 'box_mode_3d',
'box_type_3d', 'img_norm_cfg', 'pcd_trans', 'sample_idx',
'pcd_scale_factor', 'pcd_rotation', 'pcd_rotation_angle',
'pts_filename', 'transformation_3d_flow', 'trans_mat',
'affine_aug')):
self.keys = keys
self.meta_keys = meta_keys
def __call__(self, results):
"""Call function to collect keys in results. The keys in ``meta_keys``
will be converted to :obj:`mmcv.DataContainer`.
Args:
results (dict): Result dict contains the data to collect.
Returns:
dict: The result dict contains the following keys
- keys in ``self.keys``
- ``img_metas``
"""
data = {}
img_metas = {}
for key in self.meta_keys:
if key in results:
img_metas[key] = results[key]
data['img_metas'] = DC(img_metas, cpu_only=True)
for key in self.keys:
data[key] = results[key]
return data
def __repr__(self):
"""str: Return a string that describes the module."""
return self.__class__.__name__ + \
f'(keys={self.keys}, meta_keys={self.meta_keys})'
@PIPELINES.register_module(force=True)
class DefaultFormatBundle3D(DefaultFormatBundle):
"""Default formatting bundle.
It simplifies the pipeline of formatting common fields for voxels,
including "proposals", "gt_bboxes", "gt_labels", "gt_masks" and
"gt_semantic_seg".
These fields are formatted as follows.
- img: (1)transpose, (2)to tensor, (3)to DataContainer (stack=True)
- proposals: (1)to tensor, (2)to DataContainer
- gt_bboxes: (1)to tensor, (2)to DataContainer
- gt_bboxes_ignore: (1)to tensor, (2)to DataContainer
- gt_labels: (1)to tensor, (2)to DataContainer
"""
def __init__(self, class_names, with_gt=True, with_label=True):
super(DefaultFormatBundle3D, self).__init__()
self.class_names = class_names
self.with_gt = with_gt
self.with_label = with_label
def __call__(self, results):
"""Call function to transform and format common fields in results.
Args:
results (dict): Result dict contains the data to convert.
Returns:
dict: The result dict contains the data that is formatted with
default bundle.
"""
# Format 3D data
if 'points' in results:
assert isinstance(results['points'], BasePoints)
results['points'] = DC(results['points'].tensor)
for key in ['voxels', 'coors', 'voxel_centers', 'num_points']:
if key not in results:
continue
results[key] = DC(to_tensor(results[key]), stack=False)
if self.with_gt:
# Clean GT bboxes in the final
if 'gt_bboxes_3d_mask' in results:
gt_bboxes_3d_mask = results['gt_bboxes_3d_mask']
results['gt_bboxes_3d'] = results['gt_bboxes_3d'][
gt_bboxes_3d_mask]
if 'gt_names_3d' in results:
results['gt_names_3d'] = results['gt_names_3d'][
gt_bboxes_3d_mask]
if 'centers2d' in results:
results['centers2d'] = results['centers2d'][
gt_bboxes_3d_mask]
if 'depths' in results:
results['depths'] = results['depths'][gt_bboxes_3d_mask]
if 'gt_bboxes_mask' in results:
gt_bboxes_mask = results['gt_bboxes_mask']
if 'gt_bboxes' in results:
results['gt_bboxes'] = results['gt_bboxes'][gt_bboxes_mask]
results['gt_names'] = results['gt_names'][gt_bboxes_mask]
if self.with_label:
if 'gt_names' in results and len(results['gt_names']) == 0:
results['gt_labels'] = np.array([], dtype=np.int64)
results['attr_labels'] = np.array([], dtype=np.int64)
elif 'gt_names' in results and isinstance(
results['gt_names'][0], list):
# gt_labels might be a list of list in multi-view setting
results['gt_labels'] = [
np.array([self.class_names.index(n) for n in res],
dtype=np.int64) for res in results['gt_names']
]
elif 'gt_names' in results:
results['gt_labels'] = np.array([
self.class_names.index(n) for n in results['gt_names']
],
dtype=np.int64)
# we still assume one pipeline for one frame LiDAR
# thus, the 3D name is list[string]
if 'gt_names_3d' in results:
results['gt_labels_3d'] = np.array([
self.class_names.index(n)
for n in results['gt_names_3d']
],
dtype=np.int64)
results = super(DefaultFormatBundle3D, self).__call__(results)
return results
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f'(class_names={self.class_names}, '
repr_str += f'with_gt={self.with_gt}, with_label={self.with_label})'
return repr_str
# Copyright (c) OpenMMLab. All rights reserved.
import os
import mmcv
import numpy as np
import torch
from PIL import Image
from pyquaternion import Quaternion
from mmdet3d.core.points import BasePoints, get_points_type
from mmdet.datasets.pipelines import LoadAnnotations, LoadImageFromFile
from mmdet3d.core.bbox import LiDARInstance3DBoxes
from mmdet3d.datasets.builder import PIPELINES
from torchvision.transforms.functional import rotate
def mmlabNormalize(img):
from mmcv.image.photometric import imnormalize
mean = np.array([123.675, 116.28, 103.53], dtype=np.float32)
std = np.array([58.395, 57.12, 57.375], dtype=np.float32)
to_rgb = True
img = imnormalize(np.array(img), mean, std, to_rgb)
img = torch.tensor(img).float().permute(2, 0, 1).contiguous()
return img
@PIPELINES.register_module()
class PrepareImageInputs(object):
def __init__(
self,
data_config,
is_train=False,
sequential=False,
):
self.is_train = is_train
self.data_config = data_config
self.normalize_img = mmlabNormalize
self.sequential = sequential
def choose_cams(self):
"""
Returns:
cam_names: List[CAM_Name0, CAM_Name1, ...]
"""
if self.is_train and self.data_config['Ncams'] < len(
self.data_config['cams']):
cam_names = np.random.choice(
self.data_config['cams'],
self.data_config['Ncams'],
replace=False)
else:
cam_names = self.data_config['cams']
return cam_names
def sample_augmentation(self, H, W, flip=None, scale=None):
"""
Args:
H:
W:
flip:
scale:
Returns:
resize: resize比例float.
resize_dims: (resize_W, resize_H)
crop: (crop_w, crop_h, crop_w + fW, crop_h + fH)
flip: 0 / 1
rotate: 随机旋转角度float
"""
fH, fW = self.data_config['input_size']
if self.is_train:
resize = float(fW) / float(W)
resize += np.random.uniform(*self.data_config['resize']) # resize的比例, 位于[fW/W − 0.06, fW/W + 0.11]之间.
resize_dims = (int(W * resize), int(H * resize)) # resize后的size
newW, newH = resize_dims
crop_h = int((1 - np.random.uniform(*self.data_config['crop_h'])) *
newH) - fH # s * H - H_in
crop_w = int(np.random.uniform(0, max(0, newW - fW))) # max(0, s * W - fW)
crop = (crop_w, crop_h, crop_w + fW, crop_h + fH)
flip = self.data_config['flip'] and np.random.choice([0, 1])
rotate = np.random.uniform(*self.data_config['rot'])
else:
resize = float(fW) / float(W)
if scale is not None:
resize += scale
else:
resize += self.data_config.get('resize_test', 0.0)
resize_dims = (int(W * resize), int(H * resize))
newW, newH = resize_dims
crop_h = int((1 - np.mean(self.data_config['crop_h'])) * newH) - fH
crop_w = int(max(0, newW - fW) / 2)
crop = (crop_w, crop_h, crop_w + fW, crop_h + fH)
flip = False if flip is None else flip
rotate = 0
return resize, resize_dims, crop, flip, rotate
def img_transform_core(self, img, resize_dims, crop, flip, rotate):
# adjust image
img = img.resize(resize_dims)
img = img.crop(crop)
if flip:
img = img.transpose(method=Image.FLIP_LEFT_RIGHT)
img = img.rotate(rotate)
return img
def get_rot(self, h):
return torch.Tensor([
[np.cos(h), np.sin(h)],
[-np.sin(h), np.cos(h)],
])
def img_transform(self, img, post_rot, post_tran, resize, resize_dims,
crop, flip, rotate):
"""
Args:
img: PIL.Image
post_rot: torch.eye(2)
post_tran: torch.eye(2)
resize: float, resize的比例.
resize_dims: Tuple(W, H), resize后的图像尺寸
crop: (crop_w, crop_h, crop_w + fW, crop_h + fH)
flip: bool
rotate: float 旋转角度
Returns:
img: PIL.Image
post_rot: Tensor (2, 2)
post_tran: Tensor (2, )
"""
# adjust image
img = self.img_transform_core(img, resize_dims, crop, flip, rotate)
# post-homography transformation
# 将上述变换以矩阵表示.
post_rot *= resize
post_tran -= torch.Tensor(crop[:2])
if flip:
A = torch.Tensor([[-1, 0], [0, 1]])
b = torch.Tensor([crop[2] - crop[0], 0])
post_rot = A.matmul(post_rot)
post_tran = A.matmul(post_tran) + b
A = self.get_rot(rotate / 180 * np.pi)
b = torch.Tensor([crop[2] - crop[0], crop[3] - crop[1]]) / 2
b = A.matmul(-b) + b
post_rot = A.matmul(post_rot)
post_tran = A.matmul(post_tran) + b
return img, post_rot, post_tran
def get_sensor_transforms(self, info, cam_name):
"""
Args:
info:
cam_name: 当前要读取的CAM.
Returns:
sensor2ego: (4, 4)
ego2global: (4, 4)
"""
w, x, y, z = info['cams'][cam_name]['sensor2ego_rotation'] # 四元数格式
# sensor to ego
sensor2ego_rot = torch.Tensor(
Quaternion(w, x, y, z).rotation_matrix) # (3, 3)
sensor2ego_tran = torch.Tensor(
info['cams'][cam_name]['sensor2ego_translation']) # (3, )
sensor2ego = sensor2ego_rot.new_zeros((4, 4))
sensor2ego[3, 3] = 1
sensor2ego[:3, :3] = sensor2ego_rot
sensor2ego[:3, -1] = sensor2ego_tran
# ego to global
w, x, y, z = info['cams'][cam_name]['ego2global_rotation'] # 四元数格式
ego2global_rot = torch.Tensor(
Quaternion(w, x, y, z).rotation_matrix) # (3, 3)
ego2global_tran = torch.Tensor(
info['cams'][cam_name]['ego2global_translation']) # (3, )
ego2global = ego2global_rot.new_zeros((4, 4))
ego2global[3, 3] = 1
ego2global[:3, :3] = ego2global_rot
ego2global[:3, -1] = ego2global_tran
return sensor2ego, ego2global
def get_inputs(self, results, flip=None, scale=None):
"""
Args:
results:
flip:
scale:
Returns:
imgs: (N_views, 3, H, W) # N_views = 6 * (N_history + 1)
sensor2egos: (N_views, 4, 4)
ego2globals: (N_views, 4, 4)
intrins: (N_views, 3, 3)
post_rots: (N_views, 3, 3)
post_trans: (N_views, 3)
"""
imgs = []
sensor2egos = []
ego2globals = []
intrins = []
post_rots = []
post_trans = []
cam_names = self.choose_cams()
results['cam_names'] = cam_names
canvas = []
for cam_name in cam_names:
cam_data = results['curr']['cams'][cam_name]
filename = cam_data['data_path']
img = Image.open(filename)
# 初始化图像增广的旋转和平移矩阵
post_rot = torch.eye(2)
post_tran = torch.zeros(2)
# 当前相机内参
intrin = torch.Tensor(cam_data['cam_intrinsic'])
# 获取当前相机的sensor2ego(4x4), ego2global(4x4)矩阵.
sensor2ego, ego2global = \
self.get_sensor_transforms(results['curr'], cam_name)
# image view augmentation (resize, crop, horizontal flip, rotate)
img_augs = self.sample_augmentation(
H=img.height, W=img.width, flip=flip, scale=scale)
resize, resize_dims, crop, flip, rotate = img_augs
# img: PIL.Image; post_rot: Tensor (2, 2); post_tran: Tensor (2, )
img, post_rot2, post_tran2 = \
self.img_transform(img, post_rot,
post_tran,
resize=resize,
resize_dims=resize_dims,
crop=crop,
flip=flip,
rotate=rotate)
# for convenience, make augmentation matrices 3x3
# 以3x3矩阵表示图像的增广
post_tran = torch.zeros(3)
post_rot = torch.eye(3)
post_tran[:2] = post_tran2
post_rot[:2, :2] = post_rot2
canvas.append(np.array(img)) # 保存未归一化的图像,应该是为了做可视化.
imgs.append(self.normalize_img(img))
if self.sequential:
assert 'adjacent' in results
for adj_info in results['adjacent']:
filename_adj = adj_info['cams'][cam_name]['data_path']
img_adjacent = Image.open(filename_adj)
# 对选择的邻近帧图像也进行增广, 增广参数与当前帧图像相同.
img_adjacent = self.img_transform_core(
img_adjacent,
resize_dims=resize_dims,
crop=crop,
flip=flip,
rotate=rotate)
imgs.append(self.normalize_img(img_adjacent))
intrins.append(intrin) # 相机内参 (3, 3)
sensor2egos.append(sensor2ego) # camera2ego变换 (4, 4)
ego2globals.append(ego2global) # ego2global变换 (4, 4)
post_rots.append(post_rot) # 图像增广旋转 (3, 3)
post_trans.append(post_tran) # 图像增广平移 (3, )
if self.sequential:
for adj_info in results['adjacent']:
# adjacent与current使用相同的图像增广, 相机内参也相同.
post_trans.extend(post_trans[:len(cam_names)])
post_rots.extend(post_rots[:len(cam_names)])
intrins.extend(intrins[:len(cam_names)])
for cam_name in cam_names:
# 获得adjacent帧对应的camera2ego变换 (4, 4)和ego2global变换 (4, 4).
sensor2ego, ego2global = \
self.get_sensor_transforms(adj_info, cam_name)
sensor2egos.append(sensor2ego)
ego2globals.append(ego2global)
imgs = torch.stack(imgs) # (N_views, 3, H, W) # N_views = 6 * (N_history + 1)
sensor2egos = torch.stack(sensor2egos) # (N_views, 4, 4)
ego2globals = torch.stack(ego2globals) # (N_views, 4, 4)
intrins = torch.stack(intrins) # (N_views, 3, 3)
post_rots = torch.stack(post_rots) # (N_views, 3, 3)
post_trans = torch.stack(post_trans) # (N_views, 3)
results['canvas'] = canvas # List[(H, W, 3), (H, W, 3), ...] len = 6
return imgs, sensor2egos, ego2globals, intrins, post_rots, post_trans
def __call__(self, results):
results['img_inputs'] = self.get_inputs(results)
return results
@PIPELINES.register_module()
class LoadAnnotationsBEVDepth(object):
def __init__(self, bda_aug_conf, classes, is_train=True):
self.bda_aug_conf = bda_aug_conf
self.is_train = is_train
self.classes = classes
def sample_bda_augmentation(self):
"""Generate bda augmentation values based on bda_config."""
if self.is_train:
rotate_bda = np.random.uniform(*self.bda_aug_conf['rot_lim'])
scale_bda = np.random.uniform(*self.bda_aug_conf['scale_lim'])
flip_dx = np.random.uniform() < self.bda_aug_conf['flip_dx_ratio']
flip_dy = np.random.uniform() < self.bda_aug_conf['flip_dy_ratio']
else:
rotate_bda = 0
scale_bda = 1.0
flip_dx = False
flip_dy = False
return rotate_bda, scale_bda, flip_dx, flip_dy
def bev_transform(self, gt_boxes, rotate_angle, scale_ratio, flip_dx,
flip_dy):
"""
Args:
gt_boxes: (N, 9)
rotate_angle:
scale_ratio:
flip_dx: bool
flip_dy: bool
Returns:
gt_boxes: (N, 9)
rot_mat: (3, 3)
"""
rotate_angle = torch.tensor(rotate_angle / 180 * np.pi)
rot_sin = torch.sin(rotate_angle)
rot_cos = torch.cos(rotate_angle)
rot_mat = torch.Tensor([[rot_cos, -rot_sin, 0], [rot_sin, rot_cos, 0],
[0, 0, 1]])
scale_mat = torch.Tensor([[scale_ratio, 0, 0], [0, scale_ratio, 0],
[0, 0, scale_ratio]])
flip_mat = torch.Tensor([[1, 0, 0], [0, 1, 0], [0, 0, 1]])
if flip_dx: # 沿着y轴翻转
flip_mat = flip_mat @ torch.Tensor([[-1, 0, 0], [0, 1, 0],
[0, 0, 1]])
if flip_dy: # 沿着x轴翻转
flip_mat = flip_mat @ torch.Tensor([[1, 0, 0], [0, -1, 0],
[0, 0, 1]])
rot_mat = flip_mat @ (scale_mat @ rot_mat) # 变换矩阵(3, 3)
if gt_boxes.shape[0] > 0:
gt_boxes[:, :3] = (
rot_mat @ gt_boxes[:, :3].unsqueeze(-1)).squeeze(-1) # 变换后的3D框中心坐标
gt_boxes[:, 3:6] *= scale_ratio # 变换后的3D框尺寸
gt_boxes[:, 6] += rotate_angle # 旋转后的3D框的方位角
# 翻转也会进一步改变方位角
if flip_dx:
gt_boxes[:, 6] = 2 * torch.asin(torch.tensor(1.0)) - gt_boxes[:, 6]
if flip_dy:
gt_boxes[:, 6] = -gt_boxes[:, 6]
gt_boxes[:, 7:] = (
rot_mat[:2, :2] @ gt_boxes[:, 7:].unsqueeze(-1)).squeeze(-1)
return gt_boxes, rot_mat
def __call__(self, results):
gt_boxes, gt_labels = results['ann_infos'] # (N_gt, 9), (N_gt, )
gt_boxes, gt_labels = torch.Tensor(np.array(gt_boxes)), torch.tensor(np.array(gt_labels))
rotate_bda, scale_bda, flip_dx, flip_dy = self.sample_bda_augmentation()
bda_mat = torch.zeros(4, 4)
bda_mat[3, 3] = 1
# gt_boxes: (N, 9) BEV增广变换后的3D框
# bda_rot: (3, 3) BEV增广矩阵, 包括旋转、缩放和翻转.
gt_boxes, bda_rot = self.bev_transform(gt_boxes, rotate_bda, scale_bda,
flip_dx, flip_dy)
bda_mat[:3, :3] = bda_rot
if len(gt_boxes) == 0:
gt_boxes = torch.zeros(0, 9)
results['gt_bboxes_3d'] = \
LiDARInstance3DBoxes(gt_boxes, box_dim=gt_boxes.shape[-1],
origin=(0.5, 0.5, 0.5))
results['gt_labels_3d'] = gt_labels
imgs, sensor2egos, ego2globals, intrins = results['img_inputs'][:4]
post_rots, post_trans = results['img_inputs'][4:]
results['img_inputs'] = (imgs, sensor2egos, ego2globals, intrins, post_rots,
post_trans, bda_rot)
results['flip_dx'] = flip_dx
results['flip_dy'] = flip_dy
results['rotate_bda'] = rotate_bda
results['scale_bda'] = scale_bda
# if 'voxel_semantics' in results:
# if flip_dx:
# results['voxel_semantics'] = results['voxel_semantics'][::-1, ...].copy()
# results['mask_lidar'] = results['mask_lidar'][::-1, ...].copy()
# results['mask_camera'] = results['mask_camera'][::-1, ...].copy()
# if flip_dy:
# results['voxel_semantics'] = results['voxel_semantics'][:, ::-1, ...].copy()
# results['mask_lidar'] = results['mask_lidar'][:, ::-1, ...].copy()
# results['mask_camera'] = results['mask_camera'][:, ::-1, ...].copy()
return results
@PIPELINES.register_module()
class PointToMultiViewDepth(object):
def __init__(self, grid_config, downsample=1):
self.downsample = downsample
self.grid_config = grid_config
def points2depthmap(self, points, height, width):
"""
Args:
points: (N_points, 3): 3: (u, v, d)
height: int
width: int
Returns:
depth_map:(H, W)
"""
height, width = height // self.downsample, width // self.downsample
depth_map = torch.zeros((height, width), dtype=torch.float32)
coor = torch.round(points[:, :2] / self.downsample) # (N_points, 2) 2: (u, v)
depth = points[:, 2] # (N_points, )哦
kept1 = (coor[:, 0] >= 0) & (coor[:, 0] < width) & (
coor[:, 1] >= 0) & (coor[:, 1] < height) & (
depth < self.grid_config['depth'][1]) & (
depth >= self.grid_config['depth'][0])
# 获取有效投影点.
coor, depth = coor[kept1], depth[kept1] # (N, 2), (N, )
ranks = coor[:, 0] + coor[:, 1] * width
sort = (ranks + depth / 100.).argsort()
coor, depth, ranks = coor[sort], depth[sort], ranks[sort]
kept2 = torch.ones(coor.shape[0], device=coor.device, dtype=torch.bool)
kept2[1:] = (ranks[1:] != ranks[:-1])
coor, depth = coor[kept2], depth[kept2]
coor = coor.to(torch.long)
depth_map[coor[:, 1], coor[:, 0]] = depth
return depth_map
def __call__(self, results):
points_lidar = results['points']
imgs, sensor2egos, ego2globals, intrins = results['img_inputs'][:4]
post_rots, post_trans, bda = results['img_inputs'][4:]
depth_map_list = []
for cid in range(len(results['cam_names'])):
cam_name = results['cam_names'][cid] # CAM_TYPE
# 猜测liadr和cam不是严格同步的,因此lidar_ego和cam_ego可能会不一致.
# 因此lidar-->cam的路径不采用: lidar --> ego --> cam
# 而是: lidar --> lidar_ego --> global --> cam_ego --> cam
lidar2lidarego = np.eye(4, dtype=np.float32)
lidar2lidarego[:3, :3] = Quaternion(
results['curr']['lidar2ego_rotation']).rotation_matrix
lidar2lidarego[:3, 3] = results['curr']['lidar2ego_translation']
lidar2lidarego = torch.from_numpy(lidar2lidarego)
lidarego2global = np.eye(4, dtype=np.float32)
lidarego2global[:3, :3] = Quaternion(
results['curr']['ego2global_rotation']).rotation_matrix
lidarego2global[:3, 3] = results['curr']['ego2global_translation']
lidarego2global = torch.from_numpy(lidarego2global)
cam2camego = np.eye(4, dtype=np.float32)
cam2camego[:3, :3] = Quaternion(
results['curr']['cams'][cam_name]
['sensor2ego_rotation']).rotation_matrix
cam2camego[:3, 3] = results['curr']['cams'][cam_name][
'sensor2ego_translation']
cam2camego = torch.from_numpy(cam2camego)
camego2global = np.eye(4, dtype=np.float32)
camego2global[:3, :3] = Quaternion(
results['curr']['cams'][cam_name]
['ego2global_rotation']).rotation_matrix
camego2global[:3, 3] = results['curr']['cams'][cam_name][
'ego2global_translation']
camego2global = torch.from_numpy(camego2global)
cam2img = np.eye(4, dtype=np.float32)
cam2img = torch.from_numpy(cam2img)
cam2img[:3, :3] = intrins[cid]
# lidar --> lidar_ego --> global --> cam_ego --> cam
lidar2cam = torch.inverse(camego2global.matmul(cam2camego)).matmul(
lidarego2global.matmul(lidar2lidarego))
lidar2img = cam2img.matmul(lidar2cam)
points_img = points_lidar.tensor[:, :3].matmul(
lidar2img[:3, :3].T) + lidar2img[:3, 3].unsqueeze(0) # (N_points, 3) 3: (ud, vd, d)
points_img = torch.cat(
[points_img[:, :2] / points_img[:, 2:3], points_img[:, 2:3]],
1) # (N_points, 3): 3: (u, v, d)
# 再考虑图像增广
points_img = points_img.matmul(
post_rots[cid].T) + post_trans[cid:cid + 1, :] # (N_points, 3): 3: (u, v, d)
depth_map = self.points2depthmap(points_img,
imgs.shape[2], # H
imgs.shape[3] # W
)
depth_map_list.append(depth_map)
depth_map = torch.stack(depth_map_list)
results['gt_depth'] = depth_map
return results
@PIPELINES.register_module()
class LoadOccGTFromFile(object):
def __call__(self, results):
occ_gt_path = results['occ_gt_path']
occ_gt_path = os.path.join(occ_gt_path, "labels.npz")
occ_labels = np.load(occ_gt_path)
semantics = occ_labels['semantics']
mask_lidar = occ_labels['mask_lidar']
mask_camera = occ_labels['mask_camera']
semantics = torch.from_numpy(semantics)
mask_lidar = torch.from_numpy(mask_lidar)
mask_camera = torch.from_numpy(mask_camera)
if results.get('flip_dx', False):
semantics = torch.flip(semantics, [0])
mask_lidar = torch.flip(mask_lidar, [0])
mask_camera = torch.flip(mask_camera, [0])
if results.get('flip_dy', False):
semantics = torch.flip(semantics, [1])
mask_lidar = torch.flip(mask_lidar, [1])
mask_camera = torch.flip(mask_camera, [1])
results['voxel_semantics'] = semantics
results['mask_lidar'] = mask_lidar
results['mask_camera'] = mask_camera
return results
from .backbones import *
from .necks import *
from .dense_heads import *
from .detectors import *
from .losses import *
\ No newline at end of file
from mmdet.models.backbones import ResNet
from .resnet import CustomResNet
from .swin import SwinTransformer
__all__ = ['ResNet', 'CustomResNet', 'SwinTransformer']
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment