Commit edb6b369 authored by ChaimZhu's avatar ChaimZhu
Browse files

fix inference

parent 27a546e9
# Copyright (c) OpenMMLab. All rights reserved.
import re
import warnings
from copy import deepcopy
from os import path as osp
from typing import Sequence, Union
import mmcv
import mmengine
import numpy as np
import torch
from mmcv.parallel import collate, scatter
from mmcv.runner import load_checkpoint
import torch.nn as nn
from mmengine.dataset import Compose
from mmengine.runner import load_checkpoint
from mmdet3d.core import Box3DMode
from mmdet3d.core import Box3DMode, Det3DDataSample, SampleList
from mmdet3d.core.bbox import get_box_type
from mmdet3d.datasets.pipelines import Compose
from mmdet3d.models import build_model
from mmdet3d.utils import get_root_logger
def convert_SyncBN(config):
......@@ -37,7 +38,7 @@ def init_model(config, checkpoint=None, device='cuda:0'):
3D segmentor.
Args:
config (str or :obj:`mmcv.Config`): Config file path or the config
config (str or :obj:`mmengine.Config`): Config file path or the config
object.
checkpoint (str, optional): Checkpoint path. If left as None, the model
will not load any weights.
......@@ -47,14 +48,14 @@ def init_model(config, checkpoint=None, device='cuda:0'):
nn.Module: The constructed detector.
"""
if isinstance(config, str):
config = mmcv.Config.fromfile(config)
elif not isinstance(config, mmcv.Config):
config = mmengine.Config.fromfile(config)
elif not isinstance(config, mmengine.Config):
raise TypeError('config must be a filename or Config object, '
f'but got {type(config)}')
config.model.pretrained = None
convert_SyncBN(config.model)
config.model.train_cfg = None
model = build_model(config.model, test_cfg=config.get('test_cfg'))
model = build_model(config.model)
if checkpoint is not None:
checkpoint = load_checkpoint(model, checkpoint, map_location='cpu')
if 'CLASSES' in checkpoint['meta']:
......@@ -67,256 +68,249 @@ def init_model(config, checkpoint=None, device='cuda:0'):
if device != 'cpu':
torch.cuda.set_device(device)
else:
logger = get_root_logger()
logger.warning('Don\'t suggest using CPU device. '
warnings.warn('Don\'t suggest using CPU device. '
'Some functions are not supported for now.')
model.to(device)
model.eval()
return model
def inference_detector(model, pcd):
PointsType = Union[str, np.ndarray, Sequence[str], Sequence[np.ndarray]]
ImagesType = Union[str, np.ndarray, Sequence[str], Sequence[np.ndarray]]
def inference_detector(model: nn.Module,
pcds: PointsType) -> Union[Det3DDataSample, SampleList]:
"""Inference point cloud with the detector.
Args:
model (nn.Module): The loaded detector.
pcd (str): Point cloud files.
pcds (str, ndarray, Sequence[str/ndarray]):
Either point cloud files or loaded point cloud.
Returns:
tuple: Predicted results and data from pipeline.
:obj:`Det3DDataSample` or list[:obj:`Det3DDataSample`]:
If pcds is a list or tuple, the same length list type results
will be returned, otherwise return the detection results directly.
"""
if isinstance(pcds, (list, tuple)):
is_batch = True
else:
pcds = [pcds]
is_batch = False
cfg = model.cfg
device = next(model.parameters()).device # model device
if not isinstance(pcd, str):
if not isinstance(pcds[0], str):
cfg = cfg.copy()
# set loading pipeline type
cfg.data.test.pipeline[0].type = 'LoadPointsFromDict'
cfg.test_dataloader.dataset.pipeline[0].type = 'LoadPointsFromDict'
# build the data pipeline
test_pipeline = deepcopy(cfg.data.test.pipeline)
test_pipeline = deepcopy(cfg.test_dataloader.dataset.pipeline)
test_pipeline = Compose(test_pipeline)
box_type_3d, box_mode_3d = get_box_type(cfg.data.test.box_type_3d)
# box_type_3d, box_mode_3d = get_box_type(
# cfg.test_dataloader.dataset.box_type_3d)
data = []
for pcd in pcds:
# prepare data
if isinstance(pcd, str):
# load from point clouds file
data = dict(
pts_filename=pcd,
box_type_3d=box_type_3d,
box_mode_3d=box_mode_3d,
# load from point cloud file
data_ = dict(
lidar_points=dict(lidar_path=pcd),
# for ScanNet demo we need axis_align_matrix
ann_info=dict(axis_align_matrix=np.eye(4)),
sweeps=[],
# set timestamp = 0
timestamp=[0],
img_fields=[],
bbox3d_fields=[],
pts_mask_fields=[],
pts_seg_fields=[],
bbox_fields=[],
mask_fields=[],
seg_fields=[])
timestamp=[0])
else:
# load from http
data = dict(
# directly use loaded point cloud
data_ = dict(
points=pcd,
box_type_3d=box_type_3d,
box_mode_3d=box_mode_3d,
# for ScanNet demo we need axis_align_matrix
ann_info=dict(axis_align_matrix=np.eye(4)),
sweeps=[],
# set timestamp = 0
timestamp=[0],
img_fields=[],
bbox3d_fields=[],
pts_mask_fields=[],
pts_seg_fields=[],
bbox_fields=[],
mask_fields=[],
seg_fields=[])
data = test_pipeline(data)
data = collate([data], samples_per_gpu=1)
if next(model.parameters()).is_cuda:
# scatter to specified GPU
data = scatter(data, [device.index])[0]
else:
# this is a workaround to avoid the bug of MMDataParallel
data['img_metas'] = data['img_metas'][0].data
data['points'] = data['points'][0].data
timestamp=[0])
data_ = test_pipeline(data_)
data.append(data_)
# forward the model
with torch.no_grad():
result = model(return_loss=False, rescale=True, **data)
return result, data
results = model.test_step(data)
if not is_batch:
return results[0]
else:
return results
def inference_multi_modality_detector(model, pcd, image, ann_file):
def inference_multi_modality_detector(model: nn.Module,
pcds: Union[str, Sequence[str]],
imgs: Union[str, Sequence[str]],
ann_files: Union[str, Sequence[str]]):
"""Inference point cloud with the multi-modality detector.
Args:
model (nn.Module): The loaded detector.
pcd (str): Point cloud files.
image (str): Image files.
ann_file (str): Annotation files.
pcds (str, Sequence[str]):
Either point cloud files or loaded point cloud.
imgs (str, Sequence[str]):
Either image files or loaded images.
ann_files (str, Sequence[str]): Annotation files.
Returns:
tuple: Predicted results and data from pipeline.
:obj:`Det3DDataSample` or list[:obj:`Det3DDataSample`]:
If pcds is a list or tuple, the same length list type results
will be returned, otherwise return the detection results directly.
"""
# TODO: We will support
if isinstance(pcds, (list, tuple)):
is_batch = True
assert isinstance(imgs, (list, tuple))
assert isinstance(ann_files, (list, tuple))
assert len(pcds) == len(imgs) == len(ann_files)
else:
pcds = [pcds]
imgs = [imgs]
ann_files = [ann_files]
is_batch = False
cfg = model.cfg
device = next(model.parameters()).device # model device
# build the data pipeline
test_pipeline = deepcopy(cfg.data.test.pipeline)
test_pipeline = deepcopy(cfg.test_dataloader.dataset.pipeline)
test_pipeline = Compose(test_pipeline)
box_type_3d, box_mode_3d = get_box_type(cfg.data.test.box_type_3d)
box_type_3d, box_mode_3d = \
get_box_type(cfg.test_dataloader.dataset.box_type_3d)
data = []
for index, pcd in enumerate(pcds):
# get data info containing calib
data_infos = mmcv.load(ann_file)
image_idx = int(re.findall(r'\d+', image)[-1]) # xxx/sunrgbd_000017.jpg
for x in data_infos:
if int(x['image']['image_idx']) != image_idx:
continue
info = x
break
data = dict(
pts_filename=pcd,
img_prefix=osp.dirname(image),
img_info=dict(filename=osp.basename(image)),
img = imgs[index]
ann_file = ann_files[index]
data_info = mmcv.load(ann_file)[0]
# TODO: check the name consistency of
# image file and point cloud file
data_ = dict(
lidar_points=dict(lidar_path=pcd),
img_path=imgs[index],
img_prefix=osp.dirname(img),
img_info=dict(filename=osp.basename(img)),
box_type_3d=box_type_3d,
box_mode_3d=box_mode_3d,
img_fields=[],
bbox3d_fields=[],
pts_mask_fields=[],
pts_seg_fields=[],
bbox_fields=[],
mask_fields=[],
seg_fields=[])
data = test_pipeline(data)
# TODO: this code is dataset-specific. Move lidar2img and
# depth2img to .pkl annotations in the future.
# LiDAR to image conversion
box_mode_3d=box_mode_3d)
data_ = test_pipeline(data_)
# LiDAR to image conversion for KITTI dataset
if box_mode_3d == Box3DMode.LIDAR:
rect = info['calib']['R0_rect'].astype(np.float32)
Trv2c = info['calib']['Tr_velo_to_cam'].astype(np.float32)
P2 = info['calib']['P2'].astype(np.float32)
lidar2img = P2 @ rect @ Trv2c
data['img_metas'][0].data['lidar2img'] = lidar2img
# Depth to image conversion
data_['lidar2img'] = data_info['images']['CAM2']['lidar2img']
# Depth to image conversion for SUNRGBD dataset
elif box_mode_3d == Box3DMode.DEPTH:
rt_mat = info['calib']['Rt']
# follow Coord3DMode.convert_point
rt_mat = np.array([[1, 0, 0], [0, 0, -1], [0, 1, 0]
]) @ rt_mat.transpose(1, 0)
depth2img = info['calib']['K'] @ rt_mat
data['img_metas'][0].data['depth2img'] = depth2img
data = collate([data], samples_per_gpu=1)
if next(model.parameters()).is_cuda:
# scatter to specified GPU
data = scatter(data, [device.index])[0]
else:
# this is a workaround to avoid the bug of MMDataParallel
data['img_metas'] = data['img_metas'][0].data
data['points'] = data['points'][0].data
data['img'] = data['img'][0].data
data_['depth2img'] = data_info['images']['CAM0']['depth2img']
data.append(data_)
# forward the model
with torch.no_grad():
result = model(return_loss=False, rescale=True, **data)
return result, data
results = model.test_step(data)
if not is_batch:
return results[0]
else:
return results
def inference_mono_3d_detector(model, image, ann_file):
def inference_mono_3d_detector(model: nn.Module, imgs: ImagesType,
ann_files: Union[str, Sequence[str]]):
"""Inference image with the monocular 3D detector.
Args:
model (nn.Module): The loaded detector.
image (str): Image files.
ann_file (str): Annotation files.
imgs (str, Sequence[str]):
Either image files or loaded images.
ann_files (str, Sequence[str]): Annotation files.
Returns:
tuple: Predicted results and data from pipeline.
:obj:`Det3DDataSample` or list[:obj:`Det3DDataSample`]:
If pcds is a list or tuple, the same length list type results
will be returned, otherwise return the detection results directly.
"""
if isinstance(imgs, (list, tuple)):
is_batch = True
else:
imgs = [imgs]
is_batch = False
cfg = model.cfg
device = next(model.parameters()).device # model device
# build the data pipeline
test_pipeline = deepcopy(cfg.data.test.pipeline)
test_pipeline = deepcopy(cfg.test_dataloader.dataset.pipeline)
test_pipeline = Compose(test_pipeline)
box_type_3d, box_mode_3d = get_box_type(cfg.data.test.box_type_3d)
box_type_3d, box_mode_3d = \
get_box_type(cfg.test_dataloader.dataset.box_type_3d)
data = []
for index, img in enumerate(imgs):
ann_file = ann_files[index]
# get data info containing calib
data_infos = mmcv.load(ann_file)
# find the info corresponding to this image
for x in data_infos['images']:
if osp.basename(x['file_name']) != osp.basename(image):
continue
img_info = x
break
data = dict(
img_prefix=osp.dirname(image),
img_info=dict(filename=osp.basename(image)),
data_info = mmcv.load(ann_file)[0]
data_ = dict(
img_path=img,
images=data_info['images'],
box_type_3d=box_type_3d,
box_mode_3d=box_mode_3d,
img_fields=[],
bbox3d_fields=[],
pts_mask_fields=[],
pts_seg_fields=[],
bbox_fields=[],
mask_fields=[],
seg_fields=[])
# camera points to image conversion
if box_mode_3d == Box3DMode.CAM:
data['img_info'].update(dict(cam_intrinsic=img_info['cam_intrinsic']))
data = test_pipeline(data)
data = collate([data], samples_per_gpu=1)
if next(model.parameters()).is_cuda:
# scatter to specified GPU
data = scatter(data, [device.index])[0]
else:
# this is a workaround to avoid the bug of MMDataParallel
data['img_metas'] = data['img_metas'][0].data
data['img'] = data['img'][0].data
box_mode_3d=box_mode_3d)
data_ = test_pipeline(data_)
# forward the model
with torch.no_grad():
result = model(return_loss=False, rescale=True, **data)
return result, data
results = model.test_step(data)
if not is_batch:
return results[0]
else:
return results
def inference_segmentor(model, pcd):
def inference_segmentor(model: nn.Module, pcds: PointsType):
"""Inference point cloud with the segmentor.
Args:
model (nn.Module): The loaded segmentor.
pcd (str): Point cloud files.
pcds (str, Sequence[str]):
Either point cloud files or loaded point cloud.
Returns:
tuple: Predicted results and data from pipeline.
:obj:`Det3DDataSample` or list[:obj:`Det3DDataSample`]:
If pcds is a list or tuple, the same length list type results
will be returned, otherwise return the detection results directly.
"""
if isinstance(pcds, (list, tuple)):
is_batch = True
else:
pcds = [pcds]
is_batch = False
cfg = model.cfg
device = next(model.parameters()).device # model device
# build the data pipeline
test_pipeline = deepcopy(cfg.data.test.pipeline)
test_pipeline = deepcopy(cfg.test_dataloader.dataset.pipeline)
test_pipeline = Compose(test_pipeline)
data = dict(
pts_filename=pcd,
img_fields=[],
bbox3d_fields=[],
pts_mask_fields=[],
pts_seg_fields=[],
bbox_fields=[],
mask_fields=[],
seg_fields=[])
data = test_pipeline(data)
data = collate([data], samples_per_gpu=1)
if next(model.parameters()).is_cuda:
# scatter to specified GPU
data = scatter(data, [device.index])[0]
else:
# this is a workaround to avoid the bug of MMDataParallel
data['img_metas'] = data['img_metas'][0].data
data['points'] = data['points'][0].data
data = []
for pcd in pcds:
data_ = dict(lidar_points=dict(lidar_path=pcd))
data_ = test_pipeline(data_)
data.append(data_)
# forward the model
with torch.no_grad():
result = model(return_loss=False, rescale=True, **data)
return result, data
results = model.test_step(data)
if not is_batch:
return results[0]
else:
return results
......@@ -513,7 +513,7 @@ class LoadPointsFromFile(BaseTransform):
class LoadPointsFromDict(LoadPointsFromFile):
"""Load Points From Dict."""
def __call__(self, results):
def transform(self, results: dict) -> dict:
assert 'points' in results
return results
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment