Commit fea61d34 authored by Shaoshuai Shi's avatar Shaoshuai Shi
Browse files

add dataset template codes with data_augmentor and data_processor structure in...

add dataset template codes with data_augmentor and data_processor structure in the unified normative coodinates, and utils codes
parent 760a9d2c
import torch
from torch.utils.data import DataLoader
from .dataset import DatasetTemplate
from .kitti.kitti_dataset import KittiDataset
__all__ = {
'DatasetTemplate': DatasetTemplate,
'KittiDataset': KittiDataset,
}
def build_dataloader(dataset_cfg, class_names, batch_size, dist, root_path=None, workers=4,
logger=None, training=True):
dataset = __all__[dataset_cfg.DATASET](
dataset_cfg=dataset_cfg,
class_names=class_names,
root_path=root_path,
training=training,
logger=logger,
)
sampler = torch.utils.data.distributed.DistributedSampler(dataset) if dist else None
dataloader = DataLoader(
dataset, batch_size=batch_size, pin_memory=True, num_workers=workers,
shuffle=(sampler is None) and training, collate_fn=dataset.collate_batch,
drop_last=False, sampler=sampler, timeout=0
)
return dataset, dataloader, sampler
import numpy as np
from ...utils import common_utils
def random_flip_along_x(gt_boxes, points):
"""
Args:
gt_boxes: (N, 7), [x, y, z, dx, dy, dz, heading]
points: (M, 3 + C)
Returns:
"""
enable = np.random.choice([False, True], replace=False, p=[0.5, 0.5])
if enable:
gt_boxes[:, 1] = -gt_boxes[:, 1]
gt_boxes[:, 6] = -gt_boxes[:, 6]
points[:, 1] = -points[:, 1]
return gt_boxes, points
def random_flip_along_y(gt_boxes, points):
"""
Args:
gt_boxes: (N, 7), [x, y, z, dx, dy, dz, heading]
points: (M, 3 + C)
Returns:
"""
enable = np.random.choice([False, True], replace=False, p=[0.5, 0.5])
if enable:
gt_boxes[:, 0] = -gt_boxes[:, 0]
gt_boxes[:, 6] = -(gt_boxes[:, 6] + np.pi)
points[:, 0] = -points[:, 0]
return gt_boxes, points
def global_rotation(gt_boxes, points, rot_range):
"""
Args:
gt_boxes: (N, 7), [x, y, z, dx, dy, dz, heading]
points: (M, 3 + C),
rot_range: [min, max]
Returns:
"""
noise_rotation = np.random.uniform(rot_range[0], rot_range[1])
points = common_utils.rotate_points_along_z(points[np.newaxis, :, :], np.array([noise_rotation]))[0]
gt_boxes[:, 0:3] = common_utils.rotate_points_along_z(gt_boxes[np.newaxis, :, 0:3], np.array([noise_rotation]))[0]
gt_boxes[:, 6] += noise_rotation
return gt_boxes, points
def global_scaling(gt_boxes, points, scale_range):
"""
Args:
gt_boxes: (N, 7), [x, y, z, dx, dy, dz, heading]
points: (M, 3 + C),
scale_range: [min, max]
Returns:
"""
if scale_range[1] - scale_range[0] < 1e-3:
return gt_boxes, points
noise_scale = np.random.uniform(scale_range[0], scale_range[1])
points[:, :3] *= noise_scale
gt_boxes[:, :6] *= noise_scale
return gt_boxes, points
from functools import partial
import numpy as np
from . import augmentor_utils, database_sampler
from ...utils import common_utils
def register_function_augmentor(src_func):
def kernel_func(self, data_dict=None, config=None):
if data_dict is None:
return partial(src_func, self=self, config=config)
return src_func
return kernel_func
class DataAugmentor(object):
def __init__(self, root_path, augmentor_configs, class_names, logger=None):
self.root_path = root_path
self.class_names = class_names
self.logger = logger
self.data_augmentor_queue = []
for cur_cfg in augmentor_configs:
cur_augmentor = getattr(self, cur_cfg.NAME)(config=cur_cfg)
self.data_augmentor_queue.append(cur_augmentor)
def gt_sampling(self, config=None):
db_sampler = database_sampler.DataBaseSampler(
root_path=self.root_path,
sampler_cfg=config,
class_names=self.class_names,
logger=self.logger
)
return db_sampler
@register_function_augmentor
def random_world_flip(self, data_dict=None, config=None):
gt_boxes, points = data_dict['gt_boxes'], data_dict['points']
for cur_axis in config['ALONG_AXIS_LIST']:
assert cur_axis in ['x', 'y']
gt_boxes, points = getattr(augmentor_utils, 'random_flip_along_%s' % cur_axis)(
gt_boxes, points,
)
data_dict['gt_boxes'] = gt_boxes
data_dict['points'] = points
return data_dict
@register_function_augmentor
def random_world_rotation(self, data_dict=None, config=None):
rot_range = config['WORLD_ROT_ANGLE']
if not isinstance(rot_range, list):
rot_range = [-rot_range, rot_range]
gt_boxes, points = augmentor_utils.global_rotation(
data_dict['gt_boxes'], data_dict['points'], rot_range=rot_range
)
data_dict['gt_boxes'] = gt_boxes
data_dict['points'] = points
return data_dict
@register_function_augmentor
def random_world_scaling(self, data_dict=None, config=None):
gt_boxes, points = augmentor_utils.global_scaling(
data_dict['gt_boxes'], data_dict['points'], config['WORLD_SCALE_RANGE']
)
data_dict['gt_boxes'] = gt_boxes
data_dict['points'] = points
return data_dict
def forward(self, data_dict):
"""
Args:
data_dict:
points: (N, 3 + C_in)
gt_boxes: optional, (N, 7) [x, y, z, dx, dy, dz, heading]
gt_names: optional, (N), string
...
Returns:
"""
for cur_augmentor in self.data_augmentor_queue:
data_dict = cur_augmentor(data_dict=data_dict)
data_dict['gt_boxes'][:, 6] = common_utils.limit_period(
data_dict['gt_boxes'][:, 6], offset=0.5, period=2 * np.pi
)
if 'calib' in data_dict:
data_dict.pop('calib')
if 'road_plane' in data_dict:
data_dict.pop('road_plane')
if 'gt_boxes_mask' in data_dict:
gt_boxes_mask = data_dict['gt_boxes_mask']
data_dict['gt_boxes'] = data_dict['gt_boxes'][gt_boxes_mask]
data_dict['gt_names'] = data_dict['gt_names'][gt_boxes_mask]
data_dict.pop('gt_boxes_mask')
return data_dict
import numpy as np
import pickle
from ...utils import box_utils
from ...ops.iou3d_nms import iou3d_nms_utils
class DataBaseSampler(object):
def __init__(self, root_path, sampler_cfg, class_names, logger=None):
self.root_path = root_path
self.class_names = class_names
self.sampler_cfg = sampler_cfg
self.logger = logger
self.db_infos = {}
for class_name in class_names:
self.db_infos[class_name] = []
for db_info_path in sampler_cfg.DB_INFO_PATH:
db_info_path = self.root_path.resolve() / db_info_path
with open(str(db_info_path), 'rb') as f:
infos = pickle.load(f)
[self.db_infos[cur_class].extend(infos[cur_class]) for cur_class in class_names]
for func_name, val in sampler_cfg.PREPARE.items():
self.db_infos = getattr(self, func_name)(self.db_infos, val)
self.sample_groups = {}
self.sample_class_num = {}
self.limit_whole_scene = sampler_cfg.get('LIMIT_WHOLE_SCENE', False)
for x in sampler_cfg.SAMPLE_GROUPS:
class_name, sample_num = x.split(':')
if class_name not in class_names:
continue
self.sample_class_num[class_name] = sample_num
self.sample_groups[class_name] = {
'sample_num': sample_num,
'pointer': len(self.db_infos[class_name]),
'indices': np.arange(len(self.db_infos[class_name]))
}
def filter_by_difficulty(self, db_infos, removed_difficulty):
new_db_infos = {}
for key, dinfos in db_infos.items():
pre_len = len(dinfos)
new_db_infos[key] = [
info for info in dinfos
if info['difficulty'] not in removed_difficulty
]
if self.logger is not None:
self.logger.info('Database filter by difficulty %s: %d => %d' % (key, pre_len, len(new_db_infos[key])))
return new_db_infos
def filter_by_min_points(self, db_infos, min_gt_points_list):
for name_num in min_gt_points_list:
name, min_num = name_num.split(':')
min_num = int(min_num)
if min_num > 0 and name in db_infos.keys():
filtered_infos = []
for info in db_infos[name]:
if info['num_points_in_gt'] >= min_num:
filtered_infos.append(info)
if self.logger is not None:
self.logger.info('Database filter by min points %s: %d => %d' %
(name, len(db_infos[name]), len(filtered_infos)))
db_infos[name] = filtered_infos
return db_infos
def sample_with_fixed_number(self, class_name, sample_group):
"""
Args:
class_name:
sample_group:
Returns:
"""
sample_num, pointer, indices = int(sample_group['sample_num']), sample_group['pointer'], sample_group['indices']
if pointer >= len(self.db_infos[class_name]):
indices = np.random.permutation(len(self.db_infos[class_name]))
pointer = 0
sampled_dict = [self.db_infos[class_name][idx] for idx in indices[pointer: pointer + sample_num]]
pointer += sample_num
sample_group['pointer'] = pointer
sample_group['indices'] = indices
return sampled_dict
@staticmethod
def put_boxes_on_road_planes(gt_boxes, road_planes, calib):
"""
Only validate in KITTIDataset
Args:
gt_boxes: (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...]
road_planes: [a, b, c, d]
calib:
Returns:
"""
a, b, c, d = road_planes
center_cam = calib.lidar_to_rect(gt_boxes[:, 0:3])
cur_height_cam = (-d - a * center_cam[:, 0] - c * center_cam[:, 2]) / b
center_cam[:, 1] = cur_height_cam
cur_lidar_height = calib.rect_to_lidar(center_cam)[:, 2]
mv_height = gt_boxes[:, 2] - gt_boxes[:, 5] / 2 - cur_lidar_height
gt_boxes[:, 2] -= mv_height # lidar view
return gt_boxes, mv_height
def add_sampled_boxes_to_scene(self, data_dict, sampled_gt_boxes, total_valid_sampled_dict):
gt_boxes_mask = data_dict['gt_boxes_mask']
gt_boxes = data_dict['gt_boxes'][gt_boxes_mask]
gt_names = data_dict['gt_names'][gt_boxes_mask]
points = data_dict['points']
if self.sampler_cfg.get('USE_ROAD_PLANE', False):
sampled_gt_boxes, mv_height = self.put_boxes_on_road_planes(
sampled_gt_boxes, data_dict['road_plane'], data_dict['calib']
)
data_dict.pop('calib')
data_dict.pop('road_plane')
obj_points_list = []
for idx, info in enumerate(total_valid_sampled_dict):
file_path = self.root_path / info['path']
obj_points = np.fromfile(str(file_path), dtype=np.float32).reshape(
[-1, self.sampler_cfg.NUM_POINT_FEATURES])
obj_points[:, :3] += info['box3d_lidar'][:3]
if self.sampler_cfg.get('USE_ROAD_PLANE', False):
# mv height
obj_points[:, 2] -= mv_height[idx]
obj_points_list.append(obj_points)
obj_points = np.concatenate(obj_points_list, axis=0)
sampled_gt_names = np.array([x['name'] for x in total_valid_sampled_dict])
large_sampled_gt_boxes = box_utils.enlarge_box3d(
sampled_gt_boxes[:, 0:7], extra_width=self.sampler_cfg.REMOVE_EXTRA_WIDTH
)
points = box_utils.remove_points_in_boxes3d(points, large_sampled_gt_boxes)
points = np.concatenate([obj_points, points], axis=0)
gt_names = np.concatenate([gt_names, sampled_gt_names], axis=0)
gt_boxes = np.concatenate([gt_boxes, sampled_gt_boxes], axis=0)
data_dict['gt_boxes'] = gt_boxes
data_dict['gt_names'] = gt_names
data_dict['points'] = points
return data_dict
def __call__(self, data_dict):
"""
Args:
data_dict:
gt_boxes: (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...]
Returns:
"""
gt_boxes = data_dict['gt_boxes']
gt_names = data_dict['gt_names']
existed_boxes = gt_boxes
total_valid_sampled_dict = []
for class_name, sample_group in self.sample_groups.items():
if self.limit_whole_scene:
num_gt = np.sum(class_name == gt_names)
sample_group['sample_num'] = str(int(self.sample_class_num[class_name]) - num_gt)
if int(sample_group['sample_num']) > 0:
sampled_dict = self.sample_with_fixed_number(class_name, sample_group)
sampled_boxes = np.stack([x['box3d_lidar'] for x in sampled_dict], axis=0).astype(np.float32)
if self.sampler_cfg.get('DATABASE_WITH_FAKELIDAR', False):
sampled_boxes = box_utils.boxes3d_kitti_fakelidar_to_lidar(sampled_boxes)
iou1 = iou3d_nms_utils.boxes_bev_iou_cpu(sampled_boxes, existed_boxes)
iou2 = iou3d_nms_utils.boxes_bev_iou_cpu(sampled_boxes, sampled_boxes)
iou2[range(sampled_boxes.shape[0]), range(sampled_boxes.shape[0])] = 0
valid_mask = ((iou1.max(axis=1) + iou2.max(axis=1)) == 0).nonzero()[0]
valid_sampled_dict = [sampled_dict[x] for x in valid_mask]
valid_sampled_boxes = sampled_boxes[valid_mask]
existed_boxes = np.concatenate((existed_boxes, valid_sampled_boxes), axis=0)
total_valid_sampled_dict.extend(valid_sampled_dict)
sampled_gt_boxes = existed_boxes[gt_boxes.shape[0]:, :]
if total_valid_sampled_dict.__len__() > 0:
data_dict = self.add_sampled_boxes_to_scene(data_dict, sampled_gt_boxes, total_valid_sampled_dict)
data_dict.pop('gt_boxes_mask')
return data_dict
from pathlib import Path
from collections import defaultdict
import numpy as np
import torch.utils.data as torch_data
from .augmentor.data_augmentor import DataAugmentor
from .processor.data_processor import DataProcessor
from .processor.point_feature_encoder import PointFeatureEncoder
from ..utils import common_utils
class DatasetTemplate(torch_data.Dataset):
def __init__(self, dataset_cfg=None, class_names=None, training=True, root_path=None, logger=None):
super().__init__()
self.dataset_cfg = dataset_cfg
self.training = training
self.class_names = class_names
self.logger = logger
self.root_path = root_path if root_path is not None else Path(self.dataset_cfg.DATA_PATH)
self.logger = logger
if self.dataset_cfg is None:
return
self.point_cloud_range = np.array(self.dataset_cfg.POINT_CLOUD_RANGE, dtype=np.float32)
self.point_feature_encoder = PointFeatureEncoder(
self.dataset_cfg.POINT_FEATURE_ENCODING,
point_cloud_range=self.point_cloud_range
)
self.data_augmentor = DataAugmentor(
self.root_path, self.dataset_cfg.DATA_AUGMENTOR, self.class_names, logger=self.logger
) if self.training else None
self.data_processor = DataProcessor(
self.dataset_cfg.DATA_PROCESSOR, point_cloud_range=self.point_cloud_range, training=self.training
)
self.grid_size = self.data_processor.grid_size
self.voxel_size = self.data_processor.voxel_size
@property
def mode(self):
return 'train' if self.training else 'test'
def __len__(self):
raise NotImplementedError
def forward(self, index):
raise NotImplementedError
def prepare_data(self, data_dict):
"""
Args:
data_dict:
points: (N, 3 + C_in)
gt_boxes: optional, (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...]
gt_names: optional, (N), string
...
Returns:
data_dict:
frame_id: string
points: (N, 3 + C_in)
gt_boxes: optional, (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...]
gt_names: optional, (N), string
use_lead_xyz: bool
voxels: optional (num_voxels, max_points_per_voxel, 3 + C)
voxel_coords: optional (num_voxels, 3)
voxel_num_points: optional (num_voxels)
...
"""
if self.training:
assert 'gt_boxes' in data_dict, 'gt_boxes should be provided for training'
gt_boxes_mask = np.array([n in self.class_names for n in data_dict['gt_names']], dtype=np.bool_)
data_dict = self.data_augmentor.forward(
data_dict={
**data_dict,
'gt_boxes_mask': gt_boxes_mask
}
)
if len(data_dict['gt_boxes']) == 0:
new_index = np.random.randint(self.__len__())
return self.__getitem__(new_index)
if data_dict.get('gt_boxes', None) is not None:
selected = common_utils.keep_arrays_by_name(data_dict['gt_names'], self.class_names)
data_dict['gt_boxes'] = data_dict['gt_boxes'][selected]
data_dict['gt_names'] = data_dict['gt_names'][selected]
gt_classes = np.array([self.class_names.index(n) + 1 for n in data_dict['gt_names']], dtype=np.int32)
gt_boxes = np.concatenate((data_dict['gt_boxes'], gt_classes.reshape(-1, 1).astype(np.float32)), axis=1)
data_dict['gt_boxes'] = gt_boxes
data_dict = self.point_feature_encoder.forward(data_dict)
data_dict = self.data_processor.forward(
data_dict=data_dict
)
data_dict.pop('gt_names')
return data_dict
@staticmethod
def collate_batch(batch_list, _unused=False):
data_dict = defaultdict(list)
for cur_sample in batch_list:
for key, val in cur_sample.items():
data_dict[key].append(val)
batch_size = len(batch_list)
ret = {}
for key, val in data_dict.items():
if key in ['voxels', 'voxel_num_points']:
ret[key] = np.concatenate(val, axis=0)
elif key in ['points', 'voxel_coords']:
coors = []
for i, coor in enumerate(val):
coor_pad = np.pad(coor, ((0, 0), (1, 0)), mode='constant', constant_values=i)
coors.append(coor_pad)
ret[key] = np.concatenate(coors, axis=0)
elif key in ['gt_boxes']:
max_gt = max([len(x) for x in val])
batch_gt_boxes3d = np.zeros((batch_size, max_gt, val[0].shape[-1]), dtype=np.float32)
for k in range(batch_size):
batch_gt_boxes3d[k, :val[k].__len__(), :] = val[k]
ret[key] = batch_gt_boxes3d
else:
ret[key] = np.stack(val, axis=0)
ret['batch_size'] = batch_size
return ret
import pickle
import copy
import numpy as np
from skimage import io
from ...utils import box_utils, common_utils, calibration_kitti, object3d_kitti
from ..dataset import DatasetTemplate
class KittiDataset(DatasetTemplate):
def __init__(self, dataset_cfg, class_names, training=True, root_path=None, logger=None):
"""
Args:
root_path:
dataset_cfg:
class_names:
training:
logger:
"""
super().__init__(
dataset_cfg=dataset_cfg, class_names=class_names, training=training, root_path=root_path, logger=logger
)
self.split = self.dataset_cfg.DATA_SPLIT[self.mode]
self.root_split_path = self.root_path / ('training' if self.split != 'test' else 'testing')
split_dir = self.root_path / 'ImageSets' / (self.split + '.txt')
self.sample_id_list = [x.strip() for x in open(split_dir).readlines()] if split_dir.exists() else None
self.kitti_infos = []
self.include_kitti_data(self.mode)
def include_kitti_data(self, mode):
if self.logger is not None:
self.logger.info('Loading KITTI dataset')
kitti_infos = []
for info_path in self.dataset_cfg.INFO_PATH[mode]:
info_path = self.root_path / info_path
with open(info_path, 'rb') as f:
infos = pickle.load(f)
kitti_infos.extend(infos)
self.kitti_infos.extend(kitti_infos)
if self.logger is not None:
self.logger.info('Total samples for KITTI dataset: %d' % (len(kitti_infos)))
def get_lidar(self, idx):
lidar_file = self.root_split_path / 'velodyne' / ('%s.bin' % idx)
assert lidar_file.exists()
return np.fromfile(str(lidar_file), dtype=np.float32).reshape(-1, 4)
def get_image_shape(self, idx):
img_file = self.root_split_path / 'image_2' / ('%s.png' % idx)
assert img_file.exists()
return np.array(io.imread(img_file).shape[:2], dtype=np.int32)
def get_label(self, idx):
label_file = self.root_split_path / 'label_2' / ('%s.txt' % idx)
assert label_file.exists()
return object3d_kitti.get_objects_from_label(label_file)
def get_calib(self, idx):
calib_file = self.root_split_path / 'calib' / ('%s.txt' % idx)
assert calib_file.exists()
return calibration_kitti.Calibration(calib_file)
def get_road_plane(self, idx):
plane_file = self.root_split_path / 'planes' / ('%s.txt' % idx)
if not plane_file.exists():
return None
with open(plane_file, 'r') as f:
lines = f.readlines()
lines = [float(i) for i in lines[3].split()]
plane = np.asarray(lines)
# Ensure normal is always facing up, this is in the rectified camera coordinate
if plane[1] > 0:
plane = -plane
norm = np.linalg.norm(plane[0:3])
plane = plane / norm
return plane
@staticmethod
def get_fov_flag(pts_rect, img_shape, calib):
"""
Args:
pts_rect:
img_shape:
calib:
Returns:
"""
pts_img, pts_rect_depth = calib.rect_to_img(pts_rect)
val_flag_1 = np.logical_and(pts_img[:, 0] >= 0, pts_img[:, 0] < img_shape[1])
val_flag_2 = np.logical_and(pts_img[:, 1] >= 0, pts_img[:, 1] < img_shape[0])
val_flag_merge = np.logical_and(val_flag_1, val_flag_2)
pts_valid_flag = np.logical_and(val_flag_merge, pts_rect_depth >= 0)
return pts_valid_flag
@staticmethod
def generate_prediction_dicts(batch_dict, pred_dicts, class_names, output_path=None):
"""
Args:
batch_dict:
frame_id:
pred_dicts: list of pred_dicts
pred_boxes: (N, 7), Tensor
pred_scores: (N), Tensor
pred_labels: (N), Tensor
class_names:
output_path:
Returns:
"""
def get_template_prediction(num_samples):
ret_dict = {
'name': np.zeros(num_samples), 'truncated': np.zeros(num_samples),
'occluded': np.zeros(num_samples), 'alpha': np.zeros(num_samples),
'bbox': np.zeros([num_samples, 4]), 'dimensions': np.zeros([num_samples, 3]),
'location': np.zeros([num_samples, 3]), 'rotation_y': np.zeros(num_samples),
'score': np.zeros(num_samples), 'boxes_lidar': np.zeros([num_samples, 7])
}
return ret_dict
def generate_single_sample_dict(batch_index, box_dict):
pred_scores = box_dict['pred_scores'].cpu().numpy()
pred_boxes = box_dict['pred_boxes'].cpu().numpy()
pred_labels = box_dict['pred_labels'].cpu().numpy()
pred_dict = get_template_prediction(pred_scores.shape[0])
if pred_scores.shape[0] == 0:
return pred_dict
calib = batch_dict['calib'][batch_index]
image_shape = batch_dict['image_shape'][batch_index]
pred_boxes_camera = box_utils.boxes3d_lidar_to_kitti_camera(pred_boxes, calib)
pred_boxes_img = box_utils.boxes3d_kitti_camera_to_imageboxes(
pred_boxes_camera, calib, image_shape=image_shape
)
pred_dict['name'] = np.array(class_names)[pred_labels - 1]
pred_dict['alpha'] = -np.arctan2(-pred_boxes[:, 1], pred_boxes[:, 0]) + pred_boxes_camera[:, 6]
pred_dict['bbox'] = pred_boxes_img
pred_dict['dimensions'] = pred_boxes_camera[:, 3:6]
pred_dict['location'] = pred_boxes_camera[:, 0:3]
pred_dict['rotation_y'] = pred_boxes_camera[:, 6]
pred_dict['score'] = pred_scores
pred_dict['boxes_lidar'] = pred_boxes
return pred_dict
annos = []
for index, box_dict in enumerate(pred_dicts):
frame_id = batch_dict['frame_id'][index]
single_pred_dict = generate_single_sample_dict(index, box_dict)
single_pred_dict['frame_id'] = frame_id
annos.append(single_pred_dict)
if output_path is not None:
cur_det_file = output_path / ('%s.txt' % frame_id)
with open(cur_det_file, 'w') as f:
bbox = single_pred_dict['bbox']
loc = single_pred_dict['location']
dims = single_pred_dict['dimensions'] # lhw -> hwl
for idx in range(len(bbox)):
print('%s -1 -1 %.4f %.4f %.4f %.4f %.4f %.4f %.4f %.4f %.4f %.4f %.4f %.4f %.4f'
% (single_pred_dict['name'][idx], single_pred_dict['alpha'][idx],
bbox[idx][0], bbox[idx][1], bbox[idx][2], bbox[idx][3],
dims[idx][1], dims[idx][2], dims[idx][0], loc[idx][0],
loc[idx][1], loc[idx][2], single_pred_dict['rotation_y'][idx],
single_pred_dict['score'][idx]), file=f)
return annos
def evaluation(self, det_annos, class_names, **kwargs):
assert 'annos' in self.kitti_infos[0].keys()
from .kitti_object_eval_python import eval as kitti_eval
if 'annos' not in self.kitti_infos[0]:
return 'None', {}
eval_det_annos = copy.deepcopy(det_annos)
eval_gt_annos = [copy.deepcopy(info['annos']) for info in self.kitti_infos]
ap_result_str, ap_dict = kitti_eval.get_official_eval_result(eval_gt_annos, eval_det_annos, class_names)
return ap_result_str, ap_dict
def __len__(self):
return len(self.kitti_infos)
def __getitem__(self, index):
# index = 4
info = copy.deepcopy(self.kitti_infos[index])
sample_idx = info['point_cloud']['lidar_idx']
points = self.get_lidar(sample_idx)
calib = self.get_calib(sample_idx)
img_shape = info['image']['image_shape']
if self.dataset_cfg.FOV_POINTS_ONLY:
pts_rect = calib.lidar_to_rect(points[:, 0:3])
fov_flag = self.get_fov_flag(pts_rect, img_shape, calib)
points = points[fov_flag]
input_dict = {
'points': points,
'frame_id': sample_idx,
'calib': calib,
}
if 'annos' in info:
annos = info['annos']
annos = common_utils.drop_info_with_name(annos, name='DontCare')
loc, dims, rots = annos['location'], annos['dimensions'], annos['rotation_y']
gt_names = annos['name']
gt_boxes_camera = np.concatenate([loc, dims, rots[..., np.newaxis]], axis=1).astype(np.float32)
gt_boxes_lidar = box_utils.boxes3d_kitti_camera_to_lidar(gt_boxes_camera, calib)
input_dict.update({
'gt_names': gt_names,
'gt_boxes': gt_boxes_lidar
})
road_plane = self.get_road_plane(sample_idx)
if road_plane is not None:
input_dict['road_plane'] = road_plane
data_dict = self.prepare_data(data_dict=input_dict)
data_dict['image_shape'] = img_shape
return data_dict
def create_kitti_infos(data_path, save_path, workers=4):
pass
if __name__ == '__main__':
pass
from functools import partial
import numpy as np
from ...utils import box_utils, common_utils
def register_function_processor(src_func):
def kernel_func(self, data_dict=None, config=None):
if data_dict is None:
return partial(src_func, self=self, config=config)
return src_func
return kernel_func
class DataProcessor(object):
def __init__(self, processor_configs, point_cloud_range, training):
self.point_cloud_range = point_cloud_range
self.training = training
self.mode = 'train' if training else 'test'
self.grid_size = self.voxel_size = None
self.data_processor_queue = []
for cur_cfg in processor_configs:
cur_processor = getattr(self, cur_cfg.NAME)(config=cur_cfg)
self.data_processor_queue.append(cur_processor)
@register_function_processor
def mask_points_and_boxes_outside_range(self, data_dict=None, config=None):
mask = common_utils.mask_points_by_range(data_dict['points'], self.point_cloud_range)
data_dict['points'] = data_dict['points'][mask]
if data_dict.get('gt_boxes', None) is not None and config.REMOVE_OUTSIDE_BOXES and self.training:
mask = box_utils.mask_boxes_outside_range_numpy(
data_dict['gt_boxes'], self.point_cloud_range, min_num_corners=config.get('min_num_corners', 1)
)
data_dict['gt_boxes'] = data_dict['gt_boxes'][mask]
return data_dict
@register_function_processor
def shuffle_points(self, data_dict=None, config=None):
if data_dict is None:
return partial(self.shuffle_points, config=config)
if config.SHUFFLE_ENABLED[self.mode]:
points = data_dict['points']
shuffle_idx = np.random.permutation(points.shape[0])
points = points[shuffle_idx]
data_dict['points'] = points
return data_dict
def transform_points_to_voxels(self, data_dict=None, config=None, voxel_generator=None):
if data_dict is None:
from spconv.utils import VoxelGenerator
voxel_generator = VoxelGenerator(
voxel_size=config.VOXEL_SIZE,
point_cloud_range=self.point_cloud_range,
max_num_points=config.MAX_POINTS_PER_VOXEL,
max_voxels=config.MAX_NUMBER_OF_VOXELS[self.mode]
)
grid_size = (self.point_cloud_range[3:6] - self.point_cloud_range[0:3]) / np.array(config.VOXEL_SIZE)
self.grid_size = np.round(grid_size).astype(np.int64)
self.voxel_size = config.VOXEL_SIZE
return partial(self.transform_points_to_voxels, voxel_generator=voxel_generator)
points = data_dict['points']
voxels, coordinates, num_points = voxel_generator.generate(points)
if not data_dict['use_lead_xyz']:
voxels = voxels[..., 3:] # remove xyz in voxels(N, 3)
data_dict['voxels'] = voxels
data_dict['voxel_coords'] = coordinates
data_dict['voxel_num_points'] = num_points
return data_dict
def forward(self, data_dict):
"""
Args:
data_dict:
points: (N, 3 + C_in)
gt_boxes: optional, (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...]
gt_names: optional, (N), string
...
Returns:
"""
for cur_processor in self.data_processor_queue:
data_dict = cur_processor(data_dict=data_dict)
return data_dict
import numpy as np
class PointFeatureEncoder(object):
def __init__(self, config, point_cloud_range=None):
super().__init__()
self.point_encoding_config = config
assert list(self.point_encoding_config.src_feature_list[0:3]) == ['x', 'y', 'z']
self.used_feature_list = self.point_encoding_config.used_feature_list
self.src_feature_list = self.point_encoding_config.src_feature_list
self.point_cloud_range = point_cloud_range
@property
def num_point_features(self):
return getattr(self, self.point_encoding_config.encoding_type)(points=None)
def forward(self, data_dict):
"""
Args:
data_dict:
points: (N, 3 + C_in)
...
Returns:
data_dict:
points: (N, 3 + C_out),
use_lead_xyz: whether to use xyz as point-wise features
...
"""
data_dict['points'], use_lead_xyz = getattr(self, self.point_encoding_config.encoding_type)(
data_dict['points']
)
data_dict['use_lead_xyz'] = use_lead_xyz
return data_dict
def absolute_coordinates_encoding(self, points=None):
if points is None:
num_output_features = len(self.used_feature_list)
return num_output_features
point_feature_list = [points[:, 0:3]]
for x in self.used_feature_list:
if x in ['x', 'y', 'z']:
continue
idx = self.src_feature_list.index(x)
point_feature_list.append(points[:, idx:idx+1])
point_features = np.concatenate(point_feature_list, axis=1)
return point_features, True
import torch
class ResidualCoder(object):
def __init__(self, code_size=7, **kwargs):
super().__init__()
self.code_size = code_size
@staticmethod
def encode_torch(boxes, anchors):
"""
Args:
boxes: (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...]
anchors: (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...]
Returns:
"""
anchors[:, 3:6] = torch.clamp_min(anchors[:, 3:6], min=1e-5)
boxes[:, 3:6] = torch.clamp_min(boxes[:, 3:6], min=1e-5)
xa, ya, za, dxa, dya, dza, ra, *cas = torch.split(anchors, 1, dim=-1)
xg, yg, zg, dxg, dyg, dzg, rg, *cgs = torch.split(boxes, 1, dim=-1)
diagonal = torch.sqrt(dxa ** 2 + dya ** 2)
xt = (xg - xa) / diagonal
yt = (yg - ya) / diagonal
zt = (zg - za) / dza
dxt = torch.log(dxg / dxa)
dyt = torch.log(dyg / dya)
dzt = torch.log(dzg / dza)
rt = rg - ra
cts = [g - a for g, a in zip(cgs, cas)]
return torch.cat([xt, yt, zt, dxt, dyt, dzt, rt, *cts], dim=-1)
@staticmethod
def decode_torch(box_encodings, anchors):
"""
Args:
box_encodings: (B, N, 7 + C) or (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...]
anchors: (B, N, 7 + C) or (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...]
Returns:
"""
xa, ya, za, dxa, dya, dza, ra, *cas = torch.split(anchors, 1, dim=-1)
xt, yt, zt, dxt, dyt, dzt, rt, *cts = torch.split(box_encodings, 1, dim=-1)
diagonal = torch.sqrt(dxa ** 2 + dya ** 2)
xg = xt * diagonal + xa
yg = yt * diagonal + ya
zg = zt * dza + za
dxg = torch.exp(dxt) * dxa
dyg = torch.exp(dyt) * dya
dzg = torch.exp(dzt) * dza
rg = rt + ra
cgs = [t + a for t, a in zip(cts, cas)]
return torch.cat([xg, yg, zg, dxg, dyg, dzg, rg, *cgs], dim=-1)
class PreviousResidualDecoder(object):
def __init__(self, code_size=7, **kwargs):
super().__init__()
self.code_size = code_size
@staticmethod
def decode_torch(box_encodings, anchors):
"""
Args:
box_encodings: (B, N, 7 + ?) x, y, z, w, l, h, r, custom values
anchors: (B, N, 7 + C) or (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...]
Returns:
"""
xa, ya, za, dxa, dya, dza, ra, *cas = torch.split(anchors, 1, dim=-1)
xt, yt, zt, wt, lt, ht, rt, *cts = torch.split(box_encodings, 1, dim=-1)
diagonal = torch.sqrt(dxa ** 2 + dya ** 2)
xg = xt * diagonal + xa
yg = yt * diagonal + ya
zg = zt * dza + za
dxg = torch.exp(lt) * dxa
dyg = torch.exp(wt) * dya
dzg = torch.exp(ht) * dza
rg = rt + ra
cgs = [t + a for t, a in zip(cts, cas)]
return torch.cat([xg, yg, zg, dxg, dyg, dzg, rg, *cgs], dim=-1)
class PreviousResidualRoIDecoder(object):
def __init__(self, code_size=7, **kwargs):
super().__init__()
self.code_size = code_size
@staticmethod
def decode_torch(box_encodings, anchors):
"""
Args:
box_encodings: (B, N, 7 + ?) x, y, z, w, l, h, r, custom values
anchors: (B, N, 7 + C) or (N, 7 + C) [x, y, z, dx, dy, dz, heading, ...]
Returns:
"""
xa, ya, za, dxa, dya, dza, ra, *cas = torch.split(anchors, 1, dim=-1)
xt, yt, zt, wt, lt, ht, rt, *cts = torch.split(box_encodings, 1, dim=-1)
diagonal = torch.sqrt(dxa ** 2 + dya ** 2)
xg = xt * diagonal + xa
yg = yt * diagonal + ya
zg = zt * dza + za
dxg = torch.exp(lt) * dxa
dyg = torch.exp(wt) * dya
dzg = torch.exp(ht) * dza
rg = ra - rt
cgs = [t + a for t, a in zip(cts, cas)]
return torch.cat([xg, yg, zg, dxg, dyg, dzg, rg, *cgs], dim=-1)
import numpy as np
import torch
from . import common_utils
from ..ops.roiaware_pool3d import roiaware_pool3d_utils
def boxes_to_corners_3d(boxes3d):
"""
7 -------- 4
/| /|
6 -------- 5 .
| | | |
. 3 -------- 0
|/ |/
2 -------- 1
Args:
boxes3d: (N, 7) [x, y, z, dx, dy, dz, heading], (x, y, z) is the box center
Returns:
"""
boxes3d, is_numpy = common_utils.check_numpy_to_torch(boxes3d)
template = boxes3d.new_tensor((
[1, 1, -1], [1, -1, -1], [-1, -1, -1], [-1, 1, -1],
[1, 1, 1], [1, -1, 1], [-1, -1, 1], [-1, 1, 1],
)) / 2
corners3d = boxes3d[:, None, 3:6].repeat(1, 8, 1) * template[None, :, :]
corners3d = common_utils.rotate_points_along_z(corners3d.view(-1, 8, 3), boxes3d[:, 6]).view(-1, 8, 3)
corners3d += boxes3d[:, None, 0:3]
return corners3d.numpy() if is_numpy else corners3d
def mask_boxes_outside_range_numpy(boxes, limit_range, min_num_corners=1):
"""
Args:
boxes: (N, 7) [x, y, z, dx, dy, dz, heading, ...], (x, y, z) is the box center
limit_range: [minx, miny, minz, maxx, maxy, maxz]
min_num_corners:
Returns:
"""
if boxes.shape[1] > 7:
boxes = boxes[:, 0:7]
corners = boxes_to_corners_3d(boxes) # (N, 8, 3)
mask = ((corners >= limit_range[0:3]) & (corners <= limit_range[3:6])).all(axis=2)
mask = mask.sum(axis=1) >= min_num_corners # (N)
return mask
def remove_points_in_boxes3d(points, boxes3d):
"""
Args:
points: (num_points, 3 + C)
boxes3d: (N, 7) [x, y, z, dx, dy, dz, heading], (x, y, z) is the box center, each box DO NOT overlaps
Returns:
"""
boxes3d, is_numpy = common_utils.check_numpy_to_torch(boxes3d)
points, is_numpy = common_utils.check_numpy_to_torch(points)
point_masks = roiaware_pool3d_utils.points_in_boxes_cpu(points[:, 0:3], boxes3d)
points = points[point_masks.sum(dim=0) == 0]
return points.numpy() if is_numpy else points
def boxes3d_kitti_camera_to_lidar(boxes3d_camera, calib):
"""
Args:
boxes3d_camera: (N, 7) [x, y, z, l, h, w, r] in rect camera coords
calib:
Returns:
boxes3d_lidar: [x, y, z, dx, dy, dz, heading], (x, y, z) is the box center
"""
xyz_camera = boxes3d_camera[:, 0:3]
l, h, w, r = boxes3d_camera[:, 3:4], boxes3d_camera[:, 4:5], boxes3d_camera[:, 5:6], boxes3d_camera[:, 6:7]
xyz_lidar = calib.rect_to_lidar(xyz_camera)
xyz_lidar[:, 2] += h[:, 0] / 2
return np.concatenate([xyz_lidar, l, w, h, -(r + np.pi / 2)], axis=-1)
def boxes3d_kitti_fakelidar_to_lidar(boxes3d_lidar):
"""
Args:
boxes3d_fakelidar: (N, 7) [x, y, z, w, l, h, r] in old LiDAR coordinates, z is bottom center
Returns:
boxes3d_lidar: [x, y, z, dx, dy, dz, heading], (x, y, z) is the box center
"""
w, l, h, r = boxes3d_lidar[:, 3:4], boxes3d_lidar[:, 4:5], boxes3d_lidar[:, 5:6], boxes3d_lidar[:, 6:7]
boxes3d_lidar[:, 2] += h[:, 0] / 2
return np.concatenate([boxes3d_lidar[:, 0:3], l, w, h, -(r + np.pi / 2)], axis=-1)
def boxes3d_kitti_lidar_to_fakelidar(boxes3d_lidar):
"""
Args:
boxes3d_lidar: (N, 7) [x, y, z, dx, dy, dz, heading], (x, y, z) is the box center
Returns:
boxes3d_fakelidar: [x, y, z, w, l, h, r] in old LiDAR coordinates, z is bottom center
"""
dx, dy, dz, heading = boxes3d_lidar[:, 3:4], boxes3d_lidar[:, 4:5], boxes3d_lidar[:, 5:6], boxes3d_lidar[:, 6:7]
boxes3d_lidar[:, 2] -= dz[:, 0] / 2
return np.concatenate([boxes3d_lidar[:, 0:3], dy, dx, dz, -heading - np.pi / 2], axis=-1)
def enlarge_box3d(boxes3d, extra_width=(0, 0, 0)):
"""
Args:
boxes3d: [x, y, z, dx, dy, dz, heading], (x, y, z) is the box center
extra_width: [extra_x, extra_y, extra_z]
Returns:
"""
boxes3d, is_numpy = common_utils.check_numpy_to_torch(boxes3d)
large_boxes3d = boxes3d.clone()
large_boxes3d[:, 3:6] += boxes3d.new_tensor(extra_width)[None, :]
return large_boxes3d
def boxes3d_lidar_to_kitti_camera(boxes3d_lidar, calib):
"""
:param boxes3d_lidar: (N, 7) [x, y, z, dx, dy, dz, heading], (x, y, z) is the box center
:param calib:
:return:
boxes3d_camera: (N, 7) [x, y, z, l, h, w, r] in rect camera coords
"""
xyz_lidar = boxes3d_lidar[:, 0:3]
l, w, h, r = boxes3d_lidar[:, 3:4], boxes3d_lidar[:, 4:5], boxes3d_lidar[:, 5:6], boxes3d_lidar[:, 6:7]
xyz_lidar[:, 2] -= h.reshape(-1) / 2
xyz_cam = calib.lidar_to_rect(xyz_lidar)
# xyz_cam[:, 1] += h.reshape(-1) / 2
r = -r - np.pi / 2
return np.concatenate([xyz_cam, l, h, w, r], axis=-1)
def boxes3d_to_corners3d_kitti_camera(boxes3d, bottom_center=True):
"""
:param boxes3d: (N, 7) [x, y, z, l, h, w, ry] in camera coords, see the definition of ry in KITTI dataset
:param bottom_center: whether y is on the bottom center of object
:return: corners3d: (N, 8, 3)
7 -------- 4
/| /|
6 -------- 5 .
| | | |
. 3 -------- 0
|/ |/
2 -------- 1
"""
boxes_num = boxes3d.shape[0]
l, h, w = boxes3d[:, 3], boxes3d[:, 4], boxes3d[:, 5]
x_corners = np.array([l / 2., l / 2., -l / 2., -l / 2., l / 2., l / 2., -l / 2., -l / 2], dtype=np.float32).T
z_corners = np.array([w / 2., -w / 2., -w / 2., w / 2., w / 2., -w / 2., -w / 2., w / 2.], dtype=np.float32).T
if bottom_center:
y_corners = np.zeros((boxes_num, 8), dtype=np.float32)
y_corners[:, 4:8] = -h.reshape(boxes_num, 1).repeat(4, axis=1) # (N, 8)
else:
y_corners = np.array([h / 2., h / 2., h / 2., h / 2., -h / 2., -h / 2., -h / 2., -h / 2.], dtype=np.float32).T
ry = boxes3d[:, 6]
zeros, ones = np.zeros(ry.size, dtype=np.float32), np.ones(ry.size, dtype=np.float32)
rot_list = np.array([[np.cos(ry), zeros, -np.sin(ry)],
[zeros, ones, zeros],
[np.sin(ry), zeros, np.cos(ry)]]) # (3, 3, N)
R_list = np.transpose(rot_list, (2, 0, 1)) # (N, 3, 3)
temp_corners = np.concatenate((x_corners.reshape(-1, 8, 1), y_corners.reshape(-1, 8, 1),
z_corners.reshape(-1, 8, 1)), axis=2) # (N, 8, 3)
rotated_corners = np.matmul(temp_corners, R_list) # (N, 8, 3)
x_corners, y_corners, z_corners = rotated_corners[:, :, 0], rotated_corners[:, :, 1], rotated_corners[:, :, 2]
x_loc, y_loc, z_loc = boxes3d[:, 0], boxes3d[:, 1], boxes3d[:, 2]
x = x_loc.reshape(-1, 1) + x_corners.reshape(-1, 8)
y = y_loc.reshape(-1, 1) + y_corners.reshape(-1, 8)
z = z_loc.reshape(-1, 1) + z_corners.reshape(-1, 8)
corners = np.concatenate((x.reshape(-1, 8, 1), y.reshape(-1, 8, 1), z.reshape(-1, 8, 1)), axis=2)
return corners.astype(np.float32)
def boxes3d_kitti_camera_to_imageboxes(boxes3d, calib, image_shape=None):
"""
:param boxes3d: (N, 7) [x, y, z, l, h, w, r] in rect camera coords
:param calib:
:return:
box_2d_preds: (N, 4) [x1, y1, x2, y2]
"""
corners3d = boxes3d_to_corners3d_kitti_camera(boxes3d)
pts_img, _ = calib.rect_to_img(corners3d.reshape(-1, 3))
corners_in_image = pts_img.reshape(-1, 8, 2)
min_uv = np.min(corners_in_image, axis=1) # (N, 2)
max_uv = np.max(corners_in_image, axis=1) # (N, 2)
boxes2d_image = np.concatenate([min_uv, max_uv], axis=1)
if image_shape is not None:
boxes2d_image[:, 0] = np.clip(boxes2d_image[:, 0], a_min=0, a_max=image_shape[1] - 1)
boxes2d_image[:, 1] = np.clip(boxes2d_image[:, 1], a_min=0, a_max=image_shape[0] - 1)
boxes2d_image[:, 2] = np.clip(boxes2d_image[:, 2], a_min=0, a_max=image_shape[1] - 1)
boxes2d_image[:, 3] = np.clip(boxes2d_image[:, 3], a_min=0, a_max=image_shape[0] - 1)
return boxes2d_image
def boxes_iou_normal(boxes_a, boxes_b):
"""
Args:
boxes_a: (N, 4) [x1, y1, x2, y2]
boxes_b: (M, 4) [x1, y1, x2, y2]
Returns:
"""
assert boxes_a.shape[1] == boxes_b.shape[1] == 4
x_min = torch.max(boxes_a[:, 0, None], boxes_b[None, :, 0])
x_max = torch.min(boxes_a[:, 2, None], boxes_b[None, :, 2])
y_min = torch.max(boxes_a[:, 1, None], boxes_b[None, :, 1])
y_max = torch.min(boxes_a[:, 3, None], boxes_b[None, :, 3])
x_len = torch.clamp_min(x_max - x_min, min=0)
y_len = torch.clamp_min(y_max - y_min, min=0)
area_a = (boxes_a[:, 2] - boxes_a[:, 0]) * (boxes_a[:, 3] - boxes_a[:, 1])
area_b = (boxes_b[:, 2] - boxes_b[:, 0]) * (boxes_b[:, 3] - boxes_b[:, 1])
a_intersect_b = x_len * y_len
iou = a_intersect_b / torch.clamp_min(area_a[:, None] + area_b[None, :] - a_intersect_b, min=1e-6)
return iou
def boxes3d_lidar_to_aligned_bev_boxes(boxes3d):
"""
Args:
boxes3d: (N, 7 + C) [x, y, z, dx, dy, dz, heading] in lidar coordinate
Returns:
aligned_bev_boxes: (N, 4) [x1, y1, x2, y2] in the above lidar coordinate
"""
rot_angle = common_utils.limit_period(boxes3d[:, 6], offset=0.5, period=np.pi).abs()
choose_dims = torch.where(rot_angle[:, None] < np.pi / 4, boxes3d[:, [3, 4]], boxes3d[:, [4, 3]])
aligned_bev_boxes = torch.cat((boxes3d[:, 0:2] - choose_dims / 2, boxes3d[:, 0:2] + choose_dims / 2), dim=1)
return aligned_bev_boxes
def boxes3d_nearest_bev_iou(boxes_a, boxes_b):
"""
Args:
boxes_a: (N, 7) [x, y, z, dx, dy, dz, heading]
boxes_b: (N, 7) [x, y, z, dx, dy, dz, heading]
Returns:
"""
boxes_bev_a = boxes3d_lidar_to_aligned_bev_boxes(boxes_a)
boxes_bev_b = boxes3d_lidar_to_aligned_bev_boxes(boxes_b)
return boxes_iou_normal(boxes_bev_a, boxes_bev_b)
import numpy as np
def get_calib_from_file(calib_file):
with open(calib_file) as f:
lines = f.readlines()
obj = lines[2].strip().split(' ')[1:]
P2 = np.array(obj, dtype=np.float32)
obj = lines[3].strip().split(' ')[1:]
P3 = np.array(obj, dtype=np.float32)
obj = lines[4].strip().split(' ')[1:]
R0 = np.array(obj, dtype=np.float32)
obj = lines[5].strip().split(' ')[1:]
Tr_velo_to_cam = np.array(obj, dtype=np.float32)
return {'P2': P2.reshape(3, 4),
'P3': P3.reshape(3, 4),
'R0': R0.reshape(3, 3),
'Tr_velo2cam': Tr_velo_to_cam.reshape(3, 4)}
class Calibration(object):
def __init__(self, calib_file):
if not isinstance(calib_file, dict):
calib = get_calib_from_file(calib_file)
else:
calib = calib_file
self.P2 = calib['P2'] # 3 x 4
self.R0 = calib['R0'] # 3 x 3
self.V2C = calib['Tr_velo2cam'] # 3 x 4
# Camera intrinsics and extrinsics
self.cu = self.P2[0, 2]
self.cv = self.P2[1, 2]
self.fu = self.P2[0, 0]
self.fv = self.P2[1, 1]
self.tx = self.P2[0, 3] / (-self.fu)
self.ty = self.P2[1, 3] / (-self.fv)
def cart_to_hom(self, pts):
"""
:param pts: (N, 3 or 2)
:return pts_hom: (N, 4 or 3)
"""
pts_hom = np.hstack((pts, np.ones((pts.shape[0], 1), dtype=np.float32)))
return pts_hom
def rect_to_lidar(self, pts_rect):
"""
:param pts_lidar: (N, 3)
:return pts_rect: (N, 3)
"""
pts_rect_hom = self.cart_to_hom(pts_rect) # (N, 4)
R0_ext = np.hstack((self.R0, np.zeros((3, 1), dtype=np.float32))) # (3, 4)
R0_ext = np.vstack((R0_ext, np.zeros((1, 4), dtype=np.float32))) # (4, 4)
R0_ext[3, 3] = 1
V2C_ext = np.vstack((self.V2C, np.zeros((1, 4), dtype=np.float32))) # (4, 4)
V2C_ext[3, 3] = 1
pts_lidar = np.dot(pts_rect_hom, np.linalg.inv(np.dot(R0_ext, V2C_ext).T))
return pts_lidar[:, 0:3]
def lidar_to_rect(self, pts_lidar):
"""
:param pts_lidar: (N, 3)
:return pts_rect: (N, 3)
"""
pts_lidar_hom = self.cart_to_hom(pts_lidar)
pts_rect = np.dot(pts_lidar_hom, np.dot(self.V2C.T, self.R0.T))
# pts_rect = reduce(np.dot, (pts_lidar_hom, self.V2C.T, self.R0.T))
return pts_rect
def rect_to_img(self, pts_rect):
"""
:param pts_rect: (N, 3)
:return pts_img: (N, 2)
"""
pts_rect_hom = self.cart_to_hom(pts_rect)
pts_2d_hom = np.dot(pts_rect_hom, self.P2.T)
pts_img = (pts_2d_hom[:, 0:2].T / pts_rect_hom[:, 2]).T # (N, 2)
pts_rect_depth = pts_2d_hom[:, 2] - self.P2.T[3, 2] # depth in rect camera coord
return pts_img, pts_rect_depth
def lidar_to_img(self, pts_lidar):
"""
:param pts_lidar: (N, 3)
:return pts_img: (N, 2)
"""
pts_rect = self.lidar_to_rect(pts_lidar)
pts_img, pts_depth = self.rect_to_img(pts_rect)
return pts_img, pts_depth
def img_to_rect(self, u, v, depth_rect):
"""
:param u: (N)
:param v: (N)
:param depth_rect: (N)
:return:
"""
x = ((u - self.cu) * depth_rect) / self.fu + self.tx
y = ((v - self.cv) * depth_rect) / self.fv + self.ty
pts_rect = np.concatenate((x.reshape(-1, 1), y.reshape(-1, 1), depth_rect.reshape(-1, 1)), axis=1)
return pts_rect
def corners3d_to_img_boxes(self, corners3d):
"""
:param corners3d: (N, 8, 3) corners in rect coordinate
:return: boxes: (None, 4) [x1, y1, x2, y2] in rgb coordinate
:return: boxes_corner: (None, 8) [xi, yi] in rgb coordinate
"""
sample_num = corners3d.shape[0]
corners3d_hom = np.concatenate((corners3d, np.ones((sample_num, 8, 1))), axis=2) # (N, 8, 4)
img_pts = np.matmul(corners3d_hom, self.P2.T) # (N, 8, 3)
x, y = img_pts[:, :, 0] / img_pts[:, :, 2], img_pts[:, :, 1] / img_pts[:, :, 2]
x1, y1 = np.min(x, axis=1), np.min(y, axis=1)
x2, y2 = np.max(x, axis=1), np.max(y, axis=1)
boxes = np.concatenate((x1.reshape(-1, 1), y1.reshape(-1, 1), x2.reshape(-1, 1), y2.reshape(-1, 1)), axis=1)
boxes_corner = np.concatenate((x.reshape(-1, 8, 1), y.reshape(-1, 8, 1)), axis=2)
return boxes, boxes_corner
import numpy as np
import torch
import random
import logging
import os
import torch.multiprocessing as mp
import torch.distributed as dist
import subprocess
def check_numpy_to_torch(x):
if isinstance(x, np.ndarray):
return torch.from_numpy(x).float(), True
return x, False
def limit_period(val, offset=0.5, period=np.pi):
val, is_numpy = check_numpy_to_torch(val)
ans = val - torch.floor(val / period + offset) * period
return ans.numpy() if is_numpy else ans
def drop_info_with_name(info, name):
ret_info = {}
keep_indices = [i for i, x in enumerate(info['name']) if x != name]
for key in info.keys():
ret_info[key] = info[key][keep_indices]
return ret_info
def rotate_points_along_z(points, angle):
"""
Args:
points: (B, N, 3 + C)
angle: (B), angle along z-axis, angle increases x ==> y
Returns:
"""
points, is_numpy = check_numpy_to_torch(points)
angle, _ = check_numpy_to_torch(angle)
cosa = torch.cos(angle)
sina = torch.sin(angle)
zeros = angle.new_zeros(points.shape[0])
ones = angle.new_ones(points.shape[0])
rot_matrix = torch.stack((
cosa, sina, zeros,
-sina, cosa, zeros,
zeros, zeros, ones
), dim=1).view(-1, 3, 3).float()
points_rot = torch.matmul(points[:, :, 0:3], rot_matrix)
points_rot = torch.cat((points_rot, points[:, :, 3:]), dim=-1)
return points_rot.numpy() if is_numpy else points_rot
def mask_points_by_range(points, limit_range):
mask = (points[:, 0] >= limit_range[0]) & (points[:, 0] <= limit_range[3]) \
& (points[:, 1] >= limit_range[1]) & (points[:, 1] <= limit_range[4])
return mask
def get_voxel_centers(voxel_coords, downsample_times, voxel_size, point_cloud_range):
"""
Args:
voxel_coords: (N, 3)
downsample_times:
voxel_size:
point_cloud_range:
Returns:
"""
assert voxel_coords.shape[1] == 3
voxel_centers = voxel_coords[:, [2, 1, 0]].float() # (xyz)
voxel_size = torch.tensor(voxel_size, device=voxel_centers.device).float() * downsample_times
pc_range = torch.tensor(point_cloud_range[0:3], device=voxel_centers.device).float()
voxel_centers = (voxel_centers + 0.5) * voxel_size + pc_range
return voxel_centers
def create_logger(log_file, rank=0, log_level=logging.INFO):
logger = logging.getLogger(__name__)
logger.setLevel(log_level if rank == 0 else 'ERROR')
formatter = logging.Formatter('%(asctime)s %(levelname)5s %(message)s')
console = logging.StreamHandler()
console.setLevel(log_level if rank == 0 else 'ERROR')
console.setFormatter(formatter)
file_handler = logging.FileHandler(filename=log_file)
file_handler.setLevel(log_level if rank == 0 else 'ERROR')
file_handler.setFormatter(formatter)
logger.addHandler(console)
logger.addHandler(file_handler)
return logger
def set_random_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def keep_arrays_by_name(gt_names, used_classes):
inds = [i for i, x in enumerate(gt_names) if x in used_classes]
inds = np.array(inds, dtype=np.int64)
return inds
def init_dist_slurm(batch_size, tcp_port, local_rank, backend='nccl'):
"""
modified from https://github.com/open-mmlab/mmdetection
Args:
batch_size:
tcp_port:
backend:
Returns:
"""
proc_id = int(os.environ['SLURM_PROCID'])
ntasks = int(os.environ['SLURM_NTASKS'])
node_list = os.environ['SLURM_NODELIST']
num_gpus = torch.cuda.device_count()
torch.cuda.set_device(proc_id % num_gpus)
addr = subprocess.getoutput('scontrol show hostname {} | head -n1'.format(node_list))
os.environ['MASTER_PORT'] = str(tcp_port)
os.environ['MASTER_ADDR'] = addr
os.environ['WORLD_SIZE'] = str(ntasks)
os.environ['RANK'] = str(proc_id)
dist.init_process_group(backend=backend)
total_gpus = dist.get_world_size()
assert batch_size % total_gpus == 0, 'Batch size should be matched with GPUS: (%d, %d)' % (batch_size, total_gpus)
batch_size_each_gpu = batch_size // total_gpus
rank = dist.get_rank()
return batch_size_each_gpu, rank
def init_dist_pytorch(batch_size, tcp_port, local_rank, backend='nccl'):
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method('spawn')
num_gpus = torch.cuda.device_count()
torch.cuda.set_device(local_rank % num_gpus)
dist.init_process_group(
backend=backend,
init_method='tcp://127.0.0.1:%d' % tcp_port,
rank=local_rank,
world_size=num_gpus
)
assert batch_size % num_gpus == 0, 'Batch size should be matched with GPUS: (%d, %d)' % (batch_size, num_gpus)
batch_size_each_gpu = batch_size // num_gpus
rank = dist.get_rank()
return batch_size_each_gpu, rank
import numpy as np
def get_objects_from_label(label_file):
with open(label_file, 'r') as f:
lines = f.readlines()
objects = [Object3d(line) for line in lines]
return objects
def cls_type_to_id(cls_type):
type_to_id = {'Car': 1, 'Pedestrian': 2, 'Cyclist': 3, 'Van': 4}
if cls_type not in type_to_id.keys():
return -1
return type_to_id[cls_type]
class Object3d(object):
def __init__(self, line):
label = line.strip().split(' ')
self.src = line
self.cls_type = label[0]
self.cls_id = cls_type_to_id(self.cls_type)
self.truncation = float(label[1])
self.occlusion = float(label[2]) # 0:fully visible 1:partly occluded 2:largely occluded 3:unknown
self.alpha = float(label[3])
self.box2d = np.array((float(label[4]), float(label[5]), float(label[6]), float(label[7])), dtype=np.float32)
self.h = float(label[8])
self.w = float(label[9])
self.l = float(label[10])
self.loc = np.array((float(label[11]), float(label[12]), float(label[13])), dtype=np.float32)
self.dis_to_cam = np.linalg.norm(self.loc)
self.ry = float(label[14])
self.score = float(label[15]) if label.__len__() == 16 else -1.0
self.level_str = None
self.level = self.get_kitti_obj_level()
def get_kitti_obj_level(self):
height = float(self.box2d[3]) - float(self.box2d[1]) + 1
if height >= 40 and self.truncation <= 0.15 and self.occlusion <= 0:
self.level_str = 'Easy'
return 0 # Easy
elif height >= 25 and self.truncation <= 0.3 and self.occlusion <= 1:
self.level_str = 'Moderate'
return 1 # Moderate
elif height >= 25 and self.truncation <= 0.5 and self.occlusion <= 2:
self.level_str = 'Hard'
return 2 # Hard
else:
self.level_str = 'UnKnown'
return -1
def generate_corners3d(self):
"""
generate corners3d representation for this object
:return corners_3d: (8, 3) corners of box3d in camera coord
"""
l, h, w = self.l, self.h, self.w
x_corners = [l / 2, l / 2, -l / 2, -l / 2, l / 2, l / 2, -l / 2, -l / 2]
y_corners = [0, 0, 0, 0, -h, -h, -h, -h]
z_corners = [w / 2, -w / 2, -w / 2, w / 2, w / 2, -w / 2, -w / 2, w / 2]
R = np.array([[np.cos(self.ry), 0, np.sin(self.ry)],
[0, 1, 0],
[-np.sin(self.ry), 0, np.cos(self.ry)]])
corners3d = np.vstack([x_corners, y_corners, z_corners]) # (3, 8)
corners3d = np.dot(R, corners3d).T
corners3d = corners3d + self.loc
return corners3d
def to_str(self):
print_str = '%s %.3f %.3f %.3f box2d: %s hwl: [%.3f %.3f %.3f] pos: %s ry: %.3f' \
% (self.cls_type, self.truncation, self.occlusion, self.alpha, self.box2d, self.h, self.w, self.l,
self.loc, self.ry)
return print_str
def to_kitti_format(self):
kitti_str = '%s %.2f %d %.2f %.2f %.2f %.2f %.2f %.2f %.2f %.2f %.2f %.2f %.2f %.2f' \
% (self.cls_type, self.truncation, int(self.occlusion), self.alpha, self.box2d[0], self.box2d[1],
self.box2d[2], self.box2d[3], self.h, self.w, self.l, self.loc[0], self.loc[1], self.loc[2],
self.ry)
return kitti_str
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment