"git@developer.sourcefind.cn:wangsen/mineru.git" did not exist on "2fc6e9a358f19aab8331f0cc2ca1a8335b5f352f"
Unverified Commit 4bd7aa18 authored by ZLTJohn's avatar ZLTJohn Committed by GitHub
Browse files

[Feature] Support DETR3D in projects (#2170)



* detr3d for dev-1.x

* update README

* clean redundant comments

* correct dir name 'config' to 'configs'

* Modified codes according to first round review

* Fixed grid_mask.py

* move detr3d_transformer.py to /layers

* reorganize files

* rename 'mmdet3d_plugin' to 'detr3d'

* Modify configs and update latest codebase

* rename configs

* forgot to save detr3d_head.py

* change data_root in configs

* try to fix lint

* correct config files

* reorganize dirs

* Fixing lint

* fixing lint

* fix lint

* Fix lint

* delete unused code

* rename configs

* delete transfrom_3d.py

* fix bugs in configs

* fix lint

* update readme

* edit config file

* rename detr3d/ to DETR3D/

* fix typos in readme

* polish readme

* fix config link in readme

---------
Co-authored-by: default avatarJingweiZhang12 <zjw18@mails.tsinghua.edu.cn>
parent 4d77b4c8
DETR3D: 3D Object Detection from Multi-view Images via 3D-to-2D Queries
> [DETR3D: 3D Object Detection from Multi-view Images via 3D-to-2D Queries](https://arxiv.org/abs/2110.06922)
<!-- [ALGORITHM] -->
## Abstract
We introduce a framework for multi-camera 3D object detection. In
contrast to existing works, which estimate 3D bounding boxes directly from
monocular images or use depth prediction networks to generate input for 3D object
detection from 2D information, our method manipulates predictions directly
in 3D space. Our architecture extracts 2D features from multiple camera images
and then uses a sparse set of 3D object queries to index into these 2D features,
linking 3D positions to multi-view images using camera transformation matrices.
Finally, our model makes a bounding box prediction per object query, using a
set-to-set loss to measure the discrepancy between the ground-truth and the prediction.
This top-down approach outperforms its bottom-up counterpart in which
object bounding box prediction follows per-pixel depth estimation, since it does
not suffer from the compounding error introduced by a depth prediction model.
Moreover, our method does not require post-processing such as non-maximum
suppression, dramatically improving inference speed. We achieve state-of-the-art
performance on the nuScenes autonomous driving benchmark.
<div align=center>
<img src="https://user-images.githubusercontent.com/67246790/209751755-3d0f0ad5-6a39-4d14-a1c7-346b5c228a1b.png" width="800"/>
</div>
## Introduction
This directory contains the implementations of DETR3D (https://arxiv.org/abs/2110.06922). Our implementations are built on top of MMdetection3D.
We have updated DETR3D to be compatible with latest mmdet3d-dev1.x. The codebase and config files have all changed to adapt to the new mmdet3d version. All previous pretrained models are verified with the result listed below. However, newly trained models are yet to be uploaded.
## Train
1. Downloads the [pretrained backbone weights](https://drive.google.com/drive/folders/1h5bDg7Oh9hKvkFL-dRhu5-ahrEp2lRNN?usp=sharing) to pretrained/
2. For example, to train DETR3D on 8 GPUs, please use
```bash
bash tools/dist_train.sh projects/DETR3D/configs/detr3d_res101_gridmask.py 8
```
## Evaluation using pretrained models
1. Download the weights accordingly.
| Backbone | mAP | NDS | Download |
| :----------------------------------------------------------------------------------------------------------: | :--: | :--: | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: |
| [DETR3D, ResNet101 w/ DCN(old)](./configs/detr3d_r101_gridmask.py) | 34.7 | 42.2 | [model](https://drive.google.com/file/d/1YWX-jIS6fxG5_JKUBNVcZtsPtShdjE4O/view?usp=sharing) \| [log](https://drive.google.com/file/d/1uvrf42seV4XbWtir-2XjrdGUZ2Qbykid/view?usp=sharing) |
| [above, + CBGS(old)](./configs/detr3d_r101_gridmask_cbgs.py) | 34.9 | 43.4 | [model](https://drive.google.com/file/d/1sXPFiA18K9OMh48wkk9dF1MxvBDUCj2t/view?usp=sharing) \| [log](https://drive.google.com/file/d/1NJNggvFGqA423usKanqbsZVE_CzF4ltT/view?usp=sharing) |
| [DETR3D, VoVNet on trainval, evaluation on test set(old)](./configs/detr3d_vovnet_gridmask_trainval_cbgs.py) | 41.2 | 47.9 | [model](https://drive.google.com/file/d/1d5FaqoBdUH6dQC3hBKEZLcqbvWK0p9Zv/view?usp=sharing) \| [log](https://drive.google.com/file/d/1ONEMm_2W9MZAutjQk1UzaqRywz5PMk3p/view?usp=sharing) |
2. Convert the old weights
From v0.17.3 to v1.0.0, mmdet3d has changed its bbox representation. Given that Box(x,y,z,θ), we have x_new = y_old, y_new = x_old, θ_new = -θ_old - π/2.
Current pretrained models( end with '(old)' ) are in trained on v0.17.3. Our regression branch outputs (cx,cy,w,l,cz,h,sin(θ),cos(θ),vx,vy). For a previous model which outputs y=\[y0,y1,y2,y3,y4,y5,y6,y7,y8,y9\], we get y_new = \[...,y3,y2,...,-y7,-y6\]. So we should change the final Linear layer's weight accordingly.
To convert the old weights, please use
```bash
python projects/DETR3D/detr3d/old_detr3d_converter.py ${CHECKPOINT_DIR}/detr3d_resnet101.pth ${CHECKPOINT_DIR}/detr3d_r101_v1.0.0.pth --code_size 10
```
3. Testing
To test, use:
```bash
bash tools/dist_test.sh projects/DETR3D/configs/detr3d_res101_gridmask.py ${CHECKPOINT_DIR}/detr3d_r101_v1.0.0.pth 8
```
<!-- Current pretrained models( end with '(old)' ) are in trained on v0.17.3. and we make them compatible with new mmdet3d by rewriting `_load_from_state_dict` method in [`detr3d.py`](./detr3d/detr3d.py) -->
## Citation
If you find this repo useful for your research, please consider citing the papers
```
@inproceedings{
detr3d,
title={DETR3D: 3D Object Detection from Multi-view Images via 3D-to-2D Queries},
author={Wang, Yue and Guizilini, Vitor and Zhang, Tianyuan and Wang, Yilun and Zhao, Hang and and Solomon, Justin M.},
booktitle={The Conference on Robot Learning ({CoRL})},
year={2021}
}
```
## Checklist
<!-- Here is a checklist illustrating a usual development workflow of a successful project, and also serves as an overview of this project's progress. The PIC (person in charge) or contributors of this project should check all the items that they believe have been finished, which will further be verified by codebase maintainers via a PR.
OpenMMLab's maintainer will review the code to ensure the project's quality. Reaching the first milestone means that this project suffices the minimum requirement of being merged into 'projects/'. But this project is only eligible to become a part of the core package upon attaining the last milestone.
Note that keeping this section up-to-date is crucial not only for this project's developers but the entire community, since there might be some other contributors joining this project and deciding their starting point from this list. It also helps maintainers accurately estimate time and effort on further code polishing, if needed.
A project does not necessarily have to be finished in a single PR, but it's essential for the project to at least reach the first milestone in its very first PR. -->
- \[\] Milestone 1: PR-ready, and acceptable to be one of the `projects/`.
- \[\] Finish the code
<!-- The code's design shall follow existing interfaces and convention. For example, each model component should be registered into `mmdet3d.registry.MODELS` and configurable via a config file. -->
- \[\] Basic docstrings & proper citation
<!-- Each major object should contain a docstring, describing its functionality and arguments. If you have adapted the code from other open-source projects, don't forget to cite the source project in docstring and make sure your behavior is not against its license. Typically, we do not accept any code snippet under GPL license. [A Short Guide to Open Source Licenses](https://medium.com/nationwide-technology/a-short-guide-to-open-source-licenses-cf5b1c329edd) -->
- \[\] Test-time correctness
<!-- If you are reproducing the result from a paper, make sure your model's inference-time performance matches that in the original paper. The weights usually could be obtained by simply renaming the keys in the official pre-trained weights. This test could be skipped though, if you are able to prove the training-time correctness and check the second milestone. -->
- \[\] A full README
<!-- As this template does. -->
- [ ] Milestone 2: Indicates a successful model implementation.
- [ ] Training-time correctness
<!-- If you are reproducing the result from a paper, checking this item means that you should have trained your model from scratch based on the original paper's specification and verified that the final result matches the report within a minor error range. -->
- [ ] Milestone 3: Good to be a part of our core package!
- [ ] Type hints and docstrings
<!-- Ideally *all* the methods should have [type hints](https://www.pythontutorial.net/python-basics/python-type-hints/) and [docstrings](https://google.github.io/styleguide/pyguide.html#381-docstrings). [Example](https://github.com/open-mmlab/mmdetection3d/blob/dev-1.x/mmdet3d/models/detectors/fcos_mono3d.py) -->
- [ ] Unit tests
<!-- Unit tests for each module are required. [Example](https://github.com/open-mmlab/mmdetection3d/blob/dev-1.x/tests/test_models/test_dense_heads/test_fcos_mono3d_head.py) -->
- [ ] Code polishing
<!-- Refactor your code according to reviewer's comment. -->
- [ ] Metafile.yml
<!-- It will be parsed by MIM and Inferencer. [Example](https://github.com/open-mmlab/mmdetection3d/blob/dev-1.x/configs/fcos3d/metafile.yml) -->
- [ ] Move your modules into the core package following the codebase's file hierarchy structure.
<!-- In particular, you may have to refactor this README into a standard one. [Example](/configs/textdet/dbnet/README.md) -->
- [ ] Refactor your modules into the core package following the codebase's file hierarchy structure.
_base_ = [
# 'mmdet3d::_base_/datasets/nus-3d.py',
'mmdet3d::_base_/default_runtime.py'
]
custom_imports = dict(imports=['projects.detr3d.detr3d'])
# If point cloud range is changed, the models should also change their point
# cloud range accordingly
point_cloud_range = [-51.2, -51.2, -5.0, 51.2, 51.2, 3.0]
voxel_size = [0.2, 0.2, 8]
img_norm_cfg = dict(
mean=[103.530, 116.280, 123.675], std=[1.0, 1.0, 1.0], bgr_to_rgb=False)
# For nuScenes we usually do 10-class detection
class_names = [
'car', 'truck', 'construction_vehicle', 'bus', 'trailer', 'barrier',
'motorcycle', 'bicycle', 'pedestrian', 'traffic_cone'
]
input_modality = dict(
use_lidar=False,
use_camera=True,
use_radar=False,
use_map=False,
use_external=False)
# this means type='DETR3D' will be processed as 'mmdet3d.DETR3D'
default_scope = 'mmdet3d'
model = dict(
type='DETR3D',
use_grid_mask=True,
data_preprocessor=dict(
type='Det3DDataPreprocessor', **img_norm_cfg, pad_size_divisor=32),
img_backbone=dict(
type='mmdet.ResNet',
depth=101,
num_stages=4,
out_indices=(0, 1, 2, 3),
frozen_stages=1,
norm_cfg=dict(type='BN2d', requires_grad=False),
norm_eval=True,
style='caffe',
dcn=dict(type='DCNv2', deform_groups=1, fallback_on_stride=False),
stage_with_dcn=(False, False, True, True)),
img_neck=dict(
type='mmdet.FPN',
in_channels=[256, 512, 1024, 2048],
out_channels=256,
start_level=1,
add_extra_convs='on_output',
num_outs=4,
relu_before_extra_convs=True),
pts_bbox_head=dict(
type='DETR3DHead',
num_query=900,
num_classes=10,
in_channels=256,
sync_cls_avg_factor=True,
with_box_refine=True,
as_two_stage=False,
transformer=dict(
type='Detr3DTransformer',
decoder=dict(
type='Detr3DTransformerDecoder',
num_layers=6,
return_intermediate=True,
transformerlayers=dict(
type='mmdet.DetrTransformerDecoderLayer',
attn_cfgs=[
dict(
type='MultiheadAttention', # mmcv.
embed_dims=256,
num_heads=8,
dropout=0.1),
dict(
type='Detr3DCrossAtten',
pc_range=point_cloud_range,
num_points=1,
embed_dims=256)
],
feedforward_channels=512,
ffn_dropout=0.1,
operation_order=('self_attn', 'norm', 'cross_attn', 'norm',
'ffn', 'norm')))),
bbox_coder=dict(
type='NMSFreeCoder',
post_center_range=[-61.2, -61.2, -10.0, 61.2, 61.2, 10.0],
pc_range=point_cloud_range,
max_num=300,
voxel_size=voxel_size,
num_classes=10),
positional_encoding=dict(
type='mmdet.SinePositionalEncoding',
num_feats=128,
normalize=True,
offset=-0.5),
loss_cls=dict(
type='mmdet.FocalLoss',
use_sigmoid=True,
gamma=2.0,
alpha=0.25,
loss_weight=2.0),
loss_bbox=dict(type='mmdet.L1Loss', loss_weight=0.25),
loss_iou=dict(type='mmdet.GIoULoss', loss_weight=0.0)),
# model training and testing settings
train_cfg=dict(
pts=dict(
grid_size=[512, 512, 1],
voxel_size=voxel_size,
point_cloud_range=point_cloud_range,
out_size_factor=4,
assigner=dict(
type='HungarianAssigner3D',
cls_cost=dict(type='mmdet.FocalLossCost', weight=2.0),
reg_cost=dict(type='BBox3DL1Cost', weight=0.25),
# ↓ Fake cost. This is just to get compatible with DETR head
iou_cost=dict(type='mmdet.IoUCost', weight=0.0),
pc_range=point_cloud_range))))
dataset_type = 'NuScenesDataset'
data_root = 'data/nuscenes/'
test_transforms = [
dict(
type='RandomResize3D',
scale=(1600, 900),
ratio_range=(1., 1.),
keep_ratio=True)
]
train_transforms = [dict(type='PhotoMetricDistortion3D')] + test_transforms
file_client_args = dict(backend='disk')
train_pipeline = [
dict(type='LoadMultiViewImageFromFiles', to_float32=True, num_views=6),
dict(
type='LoadAnnotations3D',
with_bbox_3d=True,
with_label_3d=True,
with_attr_label=False),
dict(type='MultiViewWrapper', transforms=train_transforms),
dict(type='ObjectRangeFilter', point_cloud_range=point_cloud_range),
dict(type='ObjectNameFilter', classes=class_names),
dict(type='Pack3DDetInputs', keys=['img', 'gt_bboxes_3d', 'gt_labels_3d'])
]
test_pipeline = [
dict(type='LoadMultiViewImageFromFiles', to_float32=True, num_views=6),
dict(type='MultiViewWrapper', transforms=test_transforms),
dict(type='Pack3DDetInputs', keys=['img'])
]
metainfo = dict(classes=class_names)
data_prefix = dict(
pts='',
CAM_FRONT='samples/CAM_FRONT',
CAM_FRONT_LEFT='samples/CAM_FRONT_LEFT',
CAM_FRONT_RIGHT='samples/CAM_FRONT_RIGHT',
CAM_BACK='samples/CAM_BACK',
CAM_BACK_RIGHT='samples/CAM_BACK_RIGHT',
CAM_BACK_LEFT='samples/CAM_BACK_LEFT')
train_dataloader = dict(
batch_size=1,
num_workers=4,
persistent_workers=True,
drop_last=False,
sampler=dict(type='DefaultSampler', shuffle=True),
dataset=dict(
type=dataset_type,
data_root=data_root,
ann_file='nuscenes_infos_train.pkl',
pipeline=train_pipeline,
load_type='frame_based',
metainfo=metainfo,
modality=input_modality,
test_mode=False,
data_prefix=data_prefix,
# we use box_type_3d='LiDAR' in kitti and nuscenes dataset
# and box_type_3d='Depth' in sunrgbd and scannet dataset.
box_type_3d='LiDAR'))
val_dataloader = dict(
batch_size=1,
num_workers=4,
persistent_workers=True,
drop_last=False,
sampler=dict(type='DefaultSampler', shuffle=False),
dataset=dict(
type=dataset_type,
data_root=data_root,
ann_file='nuscenes_infos_val.pkl',
load_type='frame_based',
pipeline=test_pipeline,
metainfo=metainfo,
modality=input_modality,
test_mode=True,
data_prefix=data_prefix,
box_type_3d='LiDAR'))
test_dataloader = val_dataloader
val_evaluator = dict(
type='NuScenesMetric',
data_root=data_root,
ann_file=data_root + 'nuscenes_infos_val.pkl',
metric='bbox')
test_evaluator = val_evaluator
optim_wrapper = dict(
type='OptimWrapper',
optimizer=dict(type='AdamW', lr=2e-4, weight_decay=0.01),
paramwise_cfg=dict(custom_keys={'img_backbone': dict(lr_mult=0.1)}),
clip_grad=dict(max_norm=35, norm_type=2),
)
# learning policy
param_scheduler = [
dict(
type='LinearLR',
start_factor=1.0 / 3,
by_epoch=False,
begin=0,
end=500),
dict(
type='CosineAnnealingLR',
by_epoch=True,
begin=0,
end=24,
T_max=24,
eta_min_ratio=1e-3)
]
total_epochs = 24
train_cfg = dict(
type='EpochBasedTrainLoop', max_epochs=total_epochs, val_interval=2)
val_cfg = dict(type='ValLoop')
test_cfg = dict(type='TestLoop')
default_hooks = dict(
checkpoint=dict(
type='CheckpointHook', interval=1, max_keep_ckpts=1, save_last=True))
load_from = 'ckpts/fcos3d.pth'
# setuptools 65 downgrades to 58.
# In mmlab-node we use setuptools 61 but occurs NO errors
vis_backends = [dict(type='TensorboardVisBackend')]
visualizer = dict(
type='Det3DLocalVisualizer', vis_backends=vis_backends, name='visualizer')
_base_ = ['./detr3d_r101_gridmask.py']
custom_imports = dict(imports=['projects.detr3d.detr3d'])
# If point cloud range is changed, the models should also change their point
# cloud range accordingly
point_cloud_range = [-51.2, -51.2, -5.0, 51.2, 51.2, 3.0]
voxel_size = [0.2, 0.2, 8]
img_norm_cfg = dict(
mean=[103.530, 116.280, 123.675], std=[1.0, 1.0, 1.0], bgr_to_rgb=False)
# For nuScenes we usually do 10-class detection
class_names = [
'car', 'truck', 'construction_vehicle', 'bus', 'trailer', 'barrier',
'motorcycle', 'bicycle', 'pedestrian', 'traffic_cone'
]
input_modality = dict(
use_lidar=False,
use_camera=True,
use_radar=False,
use_map=False,
use_external=False)
dataset_type = 'NuScenesDataset'
data_root = 'data/nuscenes/'
test_transforms = [
dict(
type='RandomResize3D',
scale=(1600, 900),
ratio_range=(1., 1.),
keep_ratio=True)
]
train_transforms = [dict(type='PhotoMetricDistortion3D')] + test_transforms
train_pipeline = [
dict(type='LoadMultiViewImageFromFiles', to_float32=True, num_views=6),
dict(
type='LoadAnnotations3D',
with_bbox_3d=True,
with_label_3d=True,
with_attr_label=False),
dict(type='MultiViewWrapper', transforms=train_transforms),
dict(type='ObjectRangeFilter', point_cloud_range=point_cloud_range),
dict(type='ObjectNameFilter', classes=class_names),
dict(type='Pack3DDetInputs', keys=['img', 'gt_bboxes_3d', 'gt_labels_3d'])
]
metainfo = dict(classes=class_names)
data_prefix = dict(
pts='',
CAM_FRONT='samples/CAM_FRONT',
CAM_FRONT_LEFT='samples/CAM_FRONT_LEFT',
CAM_FRONT_RIGHT='samples/CAM_FRONT_RIGHT',
CAM_BACK='samples/CAM_BACK',
CAM_BACK_RIGHT='samples/CAM_BACK_RIGHT',
CAM_BACK_LEFT='samples/CAM_BACK_LEFT')
train_dataloader = dict(
_delete_=True,
batch_size=1,
num_workers=4,
persistent_workers=True,
drop_last=False,
sampler=dict(type='DefaultSampler', shuffle=True),
dataset=dict(
type='CBGSDataset',
dataset=dict(
type=dataset_type,
data_root=data_root,
ann_file='nuscenes_infos_train.pkl',
pipeline=train_pipeline,
load_type='frame_based',
metainfo=metainfo,
modality=input_modality,
test_mode=False,
data_prefix=data_prefix,
# we use box_type_3d='LiDAR' in kitti and nuscenes dataset
# and box_type_3d='Depth' in sunrgbd and scannet dataset.
box_type_3d='LiDAR')))
_base_ = ['./detr3d_r101_gridmask_cbgs.py']
custom_imports = dict(imports=['projects.detr3d.detr3d'])
img_norm_cfg = dict(
mean=[103.530, 116.280, 123.675],
std=[57.375, 57.120, 58.395],
bgr_to_rgb=False)
# this means type='DETR3D' will be processed as 'mmdet3d.DETR3D'
default_scope = 'mmdet3d'
model = dict(
type='DETR3D',
use_grid_mask=True,
data_preprocessor=dict(
type='Det3DDataPreprocessor', **img_norm_cfg, pad_size_divisor=32),
img_backbone=dict(
_delete_=True,
type='VoVNet',
spec_name='V-99-eSE',
norm_eval=True,
frozen_stages=1,
input_ch=3,
out_features=['stage2', 'stage3', 'stage4', 'stage5']),
img_neck=dict(
type='mmdet.FPN',
in_channels=[256, 512, 768, 1024],
out_channels=256,
start_level=0,
add_extra_convs='on_output',
num_outs=4,
relu_before_extra_convs=True))
train_dataloader = dict(
dataset=dict(
type='CBGSDataset',
dataset=dict(ann_file='nuscenes_infos_trainval.pkl')))
load_from = 'ckpts/dd3d_det_final.pth'
find_unused_parameters = True
from .detr3d import DETR3D
from .detr3d_head import DETR3DHead
from .detr3d_transformer import (Detr3DCrossAtten, Detr3DTransformer,
Detr3DTransformerDecoder)
from .hungarian_assigner_3d import HungarianAssigner3D
from .match_cost import BBox3DL1Cost
from .nms_free_coder import NMSFreeCoder
from .vovnet import VoVNet
__all__ = [
'VoVNet', 'DETR3D', 'DETR3DHead', 'Detr3DTransformer',
'Detr3DTransformerDecoder', 'Detr3DCrossAtten', 'HungarianAssigner3D',
'BBox3DL1Cost', 'NMSFreeCoder'
]
from typing import Dict, List, Optional
import torch
from torch import Tensor
from mmdet3d.models.detectors.mvx_two_stage import MVXTwoStageDetector
from mmdet3d.registry import MODELS
from mmdet3d.structures import Det3DDataSample
from mmdet3d.structures.bbox_3d.utils import get_lidar2img
from .grid_mask import GridMask
@MODELS.register_module()
class DETR3D(MVXTwoStageDetector):
"""DETR3D: 3D Object Detection from Multi-view Images via 3D-to-2D Queries
Args:
data_preprocessor (dict or ConfigDict, optional): The pre-process
config of :class:`Det3DDataPreprocessor`. Defaults to None.
use_grid_mask (bool) : Data augmentation. Whether to mask out some
grids during extract_img_feat. Defaults to False.
img_backbone (dict, optional): Backbone of extracting
images feature. Defaults to None.
img_neck (dict, optional): Neck of extracting
image features. Defaults to None.
pts_bbox_head (dict, optional): Bboxes head of
detr3d. Defaults to None.
train_cfg (dict, optional): Train config of model.
Defaults to None.
test_cfg (dict, optional): Train config of model.
Defaults to None.
init_cfg (dict, optional): Initialize config of
model. Defaults to None.
"""
def __init__(self,
data_preprocessor=None,
use_grid_mask=False,
img_backbone=None,
img_neck=None,
pts_bbox_head=None,
train_cfg=None,
test_cfg=None,
pretrained=None):
super(DETR3D, self).__init__(
img_backbone=img_backbone,
img_neck=img_neck,
pts_bbox_head=pts_bbox_head,
train_cfg=train_cfg,
test_cfg=test_cfg,
data_preprocessor=data_preprocessor)
self.grid_mask = GridMask(
True, True, rotate=1, offset=False, ratio=0.5, mode=1, prob=0.7)
self.use_grid_mask = use_grid_mask
def extract_img_feat(self, img: Tensor,
batch_input_metas: List[dict]) -> List[Tensor]:
"""Extract features from images.
Args:
img (tensor): Batched multi-view image tensor with
shape (B, N, C, H, W).
batch_input_metas (list[dict]): Meta information of multiple inputs
in a batch.
Returns:
list[tensor]: multi-level image features.
"""
B = img.size(0)
if img is not None:
input_shape = img.shape[-2:] # bs nchw
# update real input shape of each single img
for img_meta in batch_input_metas:
img_meta.update(input_shape=input_shape)
if img.dim() == 5 and img.size(0) == 1:
img.squeeze_()
elif img.dim() == 5 and img.size(0) > 1:
B, N, C, H, W = img.size()
img = img.view(B * N, C, H, W)
if self.use_grid_mask:
img = self.grid_mask(img) # mask out some grids
img_feats = self.img_backbone(img)
if isinstance(img_feats, dict):
img_feats = list(img_feats.values())
else:
return None
if self.with_img_neck:
img_feats = self.img_neck(img_feats)
img_feats_reshaped = []
for img_feat in img_feats:
BN, C, H, W = img_feat.size()
img_feats_reshaped.append(img_feat.view(B, int(BN / B), C, H, W))
return img_feats_reshaped
def extract_feat(self, batch_inputs_dict: Dict,
batch_input_metas: List[dict]) -> List[Tensor]:
"""Extract features from images.
Refer to self.extract_img_feat()
"""
imgs = batch_inputs_dict.get('imgs', None)
img_feats = self.extract_img_feat(imgs, batch_input_metas)
return img_feats
def _forward(self):
raise NotImplementedError('tensor mode is yet to add')
# original forward_train
def loss(self, batch_inputs_dict: Dict[List, Tensor],
batch_data_samples: List[Det3DDataSample],
**kwargs) -> List[Det3DDataSample]:
"""
Args:
batch_inputs_dict (dict): The model input dict which include
`imgs` keys.
- imgs (torch.Tensor): Tensor of batched multi-view images.
It has shape (B, N, C, H ,W)
batch_data_samples (List[obj:`Det3DDataSample`]): The Data Samples
It usually includes information such as `gt_instance_3d`.
Returns:
dict[str, Tensor]: A dictionary of loss components.
"""
batch_input_metas = [item.metainfo for item in batch_data_samples]
batch_input_metas = self.add_lidar2img(batch_input_metas)
img_feats = self.extract_feat(batch_inputs_dict, batch_input_metas)
outs = self.pts_bbox_head(img_feats, batch_input_metas, **kwargs)
batch_gt_instances_3d = [
item.gt_instances_3d for item in batch_data_samples
]
loss_inputs = [batch_gt_instances_3d, outs]
losses_pts = self.pts_bbox_head.loss_by_feat(*loss_inputs)
return losses_pts
# original simple_test
def predict(self, batch_inputs_dict: Dict[str, Optional[Tensor]],
batch_data_samples: List[Det3DDataSample],
**kwargs) -> List[Det3DDataSample]:
"""Forward of testing.
Args:
batch_inputs_dict (dict): The model input dict which include
`imgs` keys.
- imgs (torch.Tensor): Tensor of batched multi-view images.
It has shape (B, N, C, H ,W)
batch_data_samples (List[:obj:`Det3DDataSample`]): The Data
Samples. It usually includes information such as
`gt_instance_3d`.
Returns:
list[:obj:`Det3DDataSample`]: Detection results of the
input sample. Each Det3DDataSample usually contain
'pred_instances_3d'. And the ``pred_instances_3d`` usually
contains following keys.
- scores_3d (Tensor): Classification scores, has a shape
(num_instances, )
- labels_3d (Tensor): Labels of bboxes, has a shape
(num_instances, ).
- bbox_3d (:obj:`BaseInstance3DBoxes`): Prediction of bboxes,
contains a tensor with shape (num_instances, 9).
"""
batch_input_metas = [item.metainfo for item in batch_data_samples]
batch_input_metas = self.add_lidar2img(batch_input_metas)
img_feats = self.extract_feat(batch_inputs_dict, batch_input_metas)
outs = self.pts_bbox_head(img_feats, batch_input_metas)
results_list_3d = self.pts_bbox_head.predict_by_feat(
outs, batch_input_metas, **kwargs)
# change the bboxes' format
detsamples = self.add_pred_to_datasample(batch_data_samples,
results_list_3d)
return detsamples
# may need speed-up
def add_lidar2img(self, batch_input_metas: List[Dict]) -> List[Dict]:
"""add 'lidar2img' transformation matrix into batch_input_metas.
Args:
batch_input_metas (list[dict]): Meta information of multiple inputs
in a batch.
Returns:
batch_input_metas (list[dict]): Meta info with lidar2img added
"""
for meta in batch_input_metas:
l2i = list()
for i in range(len(meta['cam2img'])):
c2i = torch.tensor(meta['cam2img'][i]).double()
l2c = torch.tensor(meta['lidar2cam'][i]).double()
l2i.append(get_lidar2img(c2i, l2c).float().numpy())
meta['lidar2img'] = l2i
return batch_input_metas
import copy
from typing import Dict, List, Tuple
import torch
import torch.nn as nn
from mmcv.cnn import Linear
from mmdet.models.dense_heads import DETRHead
from mmdet.models.layers import inverse_sigmoid
from mmdet.models.utils import multi_apply
from mmdet.utils import InstanceList, OptInstanceList, reduce_mean
from mmengine.model import bias_init_with_prob
from mmengine.structures import InstanceData
from torch import Tensor
from mmdet3d.registry import MODELS, TASK_UTILS
from .util import normalize_bbox
@MODELS.register_module()
class DETR3DHead(DETRHead):
"""Head of DETR3D.
Args:
with_box_refine (bool): Whether to refine the reference points
in the decoder. Defaults to False.
as_two_stage (bool) : Whether to generate the proposal from
the outputs of encoder.
transformer (obj:`ConfigDict`): ConfigDict is used for building
the Encoder and Decoder.
bbox_coder (obj:`ConfigDict`): Configs to build the bbox coder
num_cls_fcs (int) : the number of layers in cls and reg branch
code_weights (List[double]) : loss weights of
(cx,cy,l,w,cz,h,sin(φ),cos(φ),v_x,v_y)
code_size (int) : size of code_weights
"""
def __init__(
self,
*args,
with_box_refine=False,
as_two_stage=False,
transformer=None,
bbox_coder=None,
num_cls_fcs=2,
code_weights=[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.2, 0.2],
code_size=10,
**kwargs):
self.with_box_refine = with_box_refine
self.as_two_stage = as_two_stage
if self.as_two_stage:
transformer['as_two_stage'] = self.as_two_stage
self.code_size = code_size
self.code_weights = code_weights
self.bbox_coder = TASK_UTILS.build(bbox_coder)
self.pc_range = self.bbox_coder.pc_range
self.num_cls_fcs = num_cls_fcs - 1
super(DETR3DHead, self).__init__(
*args, transformer=transformer, **kwargs)
# DETR sampling=False, so use PseudoSampler, format the result
sampler_cfg = dict(type='PseudoSampler')
self.sampler = TASK_UTILS.build(sampler_cfg)
self.code_weights = nn.Parameter(
torch.tensor(self.code_weights, requires_grad=False),
requires_grad=False)
# forward_train -> loss
def _init_layers(self):
"""Initialize classification branch and regression branch of head."""
cls_branch = []
for _ in range(self.num_reg_fcs):
cls_branch.append(Linear(self.embed_dims, self.embed_dims))
cls_branch.append(nn.LayerNorm(self.embed_dims))
cls_branch.append(nn.ReLU(inplace=True))
cls_branch.append(Linear(self.embed_dims, self.cls_out_channels))
fc_cls = nn.Sequential(*cls_branch)
reg_branch = []
for _ in range(self.num_reg_fcs):
reg_branch.append(Linear(self.embed_dims, self.embed_dims))
reg_branch.append(nn.ReLU())
reg_branch.append(Linear(self.embed_dims, self.code_size))
reg_branch = nn.Sequential(*reg_branch)
def _get_clones(module, N):
return nn.ModuleList([copy.deepcopy(module) for i in range(N)])
# last reg_branch is used to generate proposal from
# encode feature map when as_two_stage is True.
num_pred = (self.transformer.decoder.num_layers + 1) if \
self.as_two_stage else self.transformer.decoder.num_layers
if self.with_box_refine:
self.cls_branches = _get_clones(fc_cls, num_pred)
self.reg_branches = _get_clones(reg_branch, num_pred)
else:
self.cls_branches = nn.ModuleList(
[fc_cls for _ in range(num_pred)])
self.reg_branches = nn.ModuleList(
[reg_branch for _ in range(num_pred)])
if not self.as_two_stage:
self.query_embedding = nn.Embedding(self.num_query,
self.embed_dims * 2)
def init_weights(self):
"""Initialize weights of the DeformDETR head."""
self.transformer.init_weights()
if self.loss_cls.use_sigmoid:
bias_init = bias_init_with_prob(0.01)
for m in self.cls_branches:
nn.init.constant_(m[-1].bias, bias_init)
def forward(self, mlvl_feats: List[Tensor], img_metas: List[Dict],
**kwargs) -> Dict[str, Tensor]:
"""Forward function.
Args:
mlvl_feats (List[Tensor]): Features from the upstream
network, each is a 5D-tensor with shape
(B, N, C, H, W).
Returns:
all_cls_scores (Tensor): Outputs from the classification head,
shape [nb_dec, bs, num_query, cls_out_channels]. Note
cls_out_channels should includes background.
all_bbox_preds (Tensor): Sigmoid outputs from the regression
head with normalized coordinate format
(cx, cy, l, w, cz, h, sin(φ), cos(φ), vx, vy).
Shape [nb_dec, bs, num_query, 10].
"""
query_embeds = self.query_embedding.weight
hs, init_reference, inter_references = self.transformer(
mlvl_feats,
query_embeds,
reg_branches=self.reg_branches if self.with_box_refine else None,
img_metas=img_metas,
**kwargs)
hs = hs.permute(0, 2, 1, 3)
outputs_classes = []
outputs_coords = []
for lvl in range(hs.shape[0]):
if lvl == 0:
reference = init_reference
else:
reference = inter_references[lvl - 1]
reference = inverse_sigmoid(reference)
outputs_class = self.cls_branches[lvl](hs[lvl])
tmp = self.reg_branches[lvl](hs[lvl]) # shape: ([B, num_q, 10])
# TODO: check the shape of reference
assert reference.shape[-1] == 3
tmp[..., 0:2] += reference[..., 0:2]
tmp[..., 0:2] = tmp[..., 0:2].sigmoid()
tmp[..., 4:5] += reference[..., 2:3]
tmp[..., 4:5] = tmp[..., 4:5].sigmoid()
tmp[..., 0:1] = \
tmp[..., 0:1] * (self.pc_range[3] - self.pc_range[0]) \
+ self.pc_range[0]
tmp[..., 1:2] = \
tmp[..., 1:2] * (self.pc_range[4] - self.pc_range[1]) \
+ self.pc_range[1]
tmp[..., 4:5] = \
tmp[..., 4:5] * (self.pc_range[5] - self.pc_range[2]) \
+ self.pc_range[2]
# TODO: check if using sigmoid
outputs_coord = tmp
outputs_classes.append(outputs_class)
outputs_coords.append(outputs_coord)
outputs_classes = torch.stack(outputs_classes)
outputs_coords = torch.stack(outputs_coords)
outs = {
'all_cls_scores': outputs_classes,
'all_bbox_preds': outputs_coords,
'enc_cls_scores': None,
'enc_bbox_preds': None,
}
return outs
def _get_target_single(
self,
cls_score: Tensor, # [query, num_cls]
bbox_pred: Tensor, # [query, 10]
gt_instances_3d: InstanceList) -> Tuple[Tensor, ...]:
"""Compute regression and classification targets for a single image."""
# turn bottm center into gravity center
gt_bboxes = gt_instances_3d.bboxes_3d # [num_gt, 9]
gt_bboxes = torch.cat(
(gt_bboxes.gravity_center, gt_bboxes.tensor[:, 3:]), dim=1)
gt_labels = gt_instances_3d.labels_3d # [num_gt, num_cls]
# assigner and sampler: PseudoSampler
assign_result = self.assigner.assign(
bbox_pred, cls_score, gt_bboxes, gt_labels, gt_bboxes_ignore=None)
sampling_result = self.sampler.sample(
assign_result, InstanceData(priors=bbox_pred),
InstanceData(bboxes_3d=gt_bboxes))
pos_inds = sampling_result.pos_inds
neg_inds = sampling_result.neg_inds
# label targets
num_bboxes = bbox_pred.size(0)
labels = gt_bboxes.new_full((num_bboxes, ),
self.num_classes,
dtype=torch.long)
labels[pos_inds] = gt_labels[sampling_result.pos_assigned_gt_inds]
label_weights = gt_bboxes.new_ones(num_bboxes)
# bbox targets
# theta in gt_bbox here is still a single scalar
bbox_targets = torch.zeros_like(bbox_pred)[..., :self.code_size - 1]
bbox_weights = torch.zeros_like(bbox_pred)
# only matched query will learn from bbox coord
bbox_weights[pos_inds] = 1.0
# fix empty gt bug in multi gpu training
if sampling_result.pos_gt_bboxes.shape[0] == 0:
sampling_result.pos_gt_bboxes = \
sampling_result.pos_gt_bboxes.reshape(0, self.code_size - 1)
bbox_targets[pos_inds] = sampling_result.pos_gt_bboxes
return (labels, label_weights, bbox_targets, bbox_weights, pos_inds,
neg_inds)
def get_targets(
self,
batch_cls_scores: List[Tensor], # bs[num_q,num_cls]
batch_bbox_preds: List[Tensor], # bs[num_q,10]
batch_gt_instances_3d: InstanceList) -> tuple():
""""Compute regression and classification targets for a batch image for
a single decoder layer.
Args:
batch_cls_scores (list[Tensor]): Box score logits from a single
decoder layer for each image with shape [num_query,
cls_out_channels].
batch_bbox_preds (list[Tensor]): Sigmoid outputs from a single
decoder layer for each image, with normalized coordinate
(cx,cy,l,w,cz,h,sin(φ),cos(φ),v_x,v_y) and
shape [num_query, 10]
batch_gt_instances_3d (list[:obj:`InstanceData`]): Batch of
gt_instance. It usually includes ``bboxes_3d``、``labels_3d``.
Returns:
tuple: a tuple containing the following targets.
- labels_list (list[Tensor]): Labels for all images.
- label_weights_list (list[Tensor]): Label weights for all \
images.
- bbox_targets_list (list[Tensor]): BBox targets for all \
images.
- bbox_weights_list (list[Tensor]): BBox weights for all \
images.
- num_total_pos (int): Number of positive samples in all \
images.
- num_total_neg (int): Number of negative samples in all \
images.
"""
(labels_list, label_weights_list, bbox_targets_list, bbox_weights_list,
pos_inds_list, neg_inds_list) = multi_apply(self._get_target_single,
batch_cls_scores,
batch_bbox_preds,
batch_gt_instances_3d)
num_total_pos = sum((inds.numel() for inds in pos_inds_list))
num_total_neg = sum((inds.numel() for inds in neg_inds_list))
return (labels_list, label_weights_list, bbox_targets_list,
bbox_weights_list, num_total_pos, num_total_neg)
def loss_by_feat_single(
self,
batch_cls_scores: Tensor, # bs,num_q,num_cls
batch_bbox_preds: Tensor, # bs,num_q,10
batch_gt_instances_3d: InstanceList
) -> Tuple[Tensor, Tensor]:
""""Loss function for outputs from a single decoder layer of a single
feature level.
Args:
batch_cls_scores (Tensor): Box score logits from a single
decoder layer for batched images with shape [num_query,
cls_out_channels].
batch_bbox_preds (Tensor): Sigmoid outputs from a single
decoder layer for batched images, with normalized coordinate
(cx,cy,l,w,cz,h,sin(φ),cos(φ),v_x,v_y) and
shape [num_query, 10]
batch_gt_instances_3d (list[:obj:`InstanceData`]): Batch of
gt_instance_3d. It usually has ``bboxes_3d``,``labels_3d``.
Returns:
tulple(Tensor, Tensor): cls and reg loss for outputs from
a single decoder layer.
"""
batch_size = batch_cls_scores.size(0) # batch size
cls_scores_list = [batch_cls_scores[i] for i in range(batch_size)]
bbox_preds_list = [batch_bbox_preds[i] for i in range(batch_size)]
cls_reg_targets = self.get_targets(cls_scores_list, bbox_preds_list,
batch_gt_instances_3d)
(labels_list, label_weights_list, bbox_targets_list, bbox_weights_list,
num_total_pos, num_total_neg) = cls_reg_targets
labels = torch.cat(labels_list, 0)
label_weights = torch.cat(label_weights_list, 0)
bbox_targets = torch.cat(bbox_targets_list, 0)
bbox_weights = torch.cat(bbox_weights_list, 0)
# classification loss
batch_cls_scores = batch_cls_scores.reshape(-1, self.cls_out_channels)
# construct weighted avg_factor to match with the official DETR repo
cls_avg_factor = num_total_pos * 1.0 + \
num_total_neg * self.bg_cls_weight
if self.sync_cls_avg_factor:
cls_avg_factor = reduce_mean(
batch_cls_scores.new_tensor([cls_avg_factor]))
cls_avg_factor = max(cls_avg_factor, 1)
loss_cls = self.loss_cls(
batch_cls_scores, labels, label_weights, avg_factor=cls_avg_factor)
# Compute the average number of gt boxes across all gpus, for
# normalization purposes
num_total_pos = loss_cls.new_tensor([num_total_pos])
num_total_pos = torch.clamp(reduce_mean(num_total_pos), min=1).item()
# regression L1 loss
batch_bbox_preds = batch_bbox_preds.reshape(-1,
batch_bbox_preds.size(-1))
normalized_bbox_targets = normalize_bbox(bbox_targets, self.pc_range)
# neg_query is all 0, log(0) is NaN
isnotnan = torch.isfinite(normalized_bbox_targets).all(dim=-1)
bbox_weights = bbox_weights * self.code_weights
loss_bbox = self.loss_bbox(
batch_bbox_preds[isnotnan, :self.code_size],
normalized_bbox_targets[isnotnan, :self.code_size],
bbox_weights[isnotnan, :self.code_size],
avg_factor=num_total_pos)
loss_cls = torch.nan_to_num(loss_cls)
loss_bbox = torch.nan_to_num(loss_bbox)
return loss_cls, loss_bbox
# original loss()
def loss_by_feat(
self,
batch_gt_instances_3d: InstanceList,
preds_dicts: Dict[str, Tensor],
batch_gt_instances_3d_ignore: OptInstanceList = None) -> Dict:
"""Compute loss of the head.
Args:
batch_gt_instances_3d (list[:obj:`InstanceData`]): Batch of
gt_instance_3d. It usually includes ``bboxes_3d``、`
`labels_3d``、``depths``、``centers_2d`` and attributes.
gt_instance. It usually includes ``bboxes``、``labels``.
batch_gt_instances_3d_ignore (list[:obj:`InstanceData`], Optional):
NOT supported.
Defaults to None.
Returns:
dict[str, Tensor]: A dictionary of loss components.
"""
assert batch_gt_instances_3d_ignore is None, \
f'{self.__class__.__name__} only supports ' \
f'for batch_gt_instances_3d_ignore setting to None.'
all_cls_scores = preds_dicts[
'all_cls_scores'] # num_dec,bs,num_q,num_cls
all_bbox_preds = preds_dicts['all_bbox_preds'] # num_dec,bs,num_q,10
enc_cls_scores = preds_dicts['enc_cls_scores']
enc_bbox_preds = preds_dicts['enc_bbox_preds']
# calculate loss for each decoder layer
num_dec_layers = len(all_cls_scores)
batch_gt_instances_3d_list = [
batch_gt_instances_3d for _ in range(num_dec_layers)
]
losses_cls, losses_bbox = multi_apply(self.loss_by_feat_single,
all_cls_scores, all_bbox_preds,
batch_gt_instances_3d_list)
loss_dict = dict()
# loss of proposal generated from encode feature map.
if enc_cls_scores is not None:
enc_loss_cls, enc_losses_bbox = self.loss_by_feat_single(
enc_cls_scores, enc_bbox_preds, batch_gt_instances_3d_list)
loss_dict['enc_loss_cls'] = enc_loss_cls
loss_dict['enc_loss_bbox'] = enc_losses_bbox
# loss from the last decoder layer
loss_dict['loss_cls'] = losses_cls[-1]
loss_dict['loss_bbox'] = losses_bbox[-1]
# loss from other decoder layers
num_dec_layer = 0
for loss_cls_i, loss_bbox_i in zip(losses_cls[:-1], losses_bbox[:-1]):
loss_dict[f'd{num_dec_layer}.loss_cls'] = loss_cls_i
loss_dict[f'd{num_dec_layer}.loss_bbox'] = loss_bbox_i
num_dec_layer += 1
return loss_dict
def predict_by_feat(self,
preds_dicts,
img_metas,
rescale=False) -> InstanceList:
"""Transform network output for a batch into bbox predictions.
Args:
preds_dicts (Dict[str, Tensor]):
-all_cls_scores (Tensor): Outputs from the classification head,
shape [nb_dec, bs, num_query, cls_out_channels]. Note
cls_out_channels should includes background.
-all_bbox_preds (Tensor): Sigmoid outputs from the regression
head with normalized coordinate format
(cx, cy, l, w, cz, h, rot_sine, rot_cosine, v_x, v_y).
Shape [nb_dec, bs, num_query, 10].
batch_img_metas (list[dict]): Meta information of each image, e.g.,
image size, scaling factor, etc.
rescale (bool): If True, return boxes in original image space.
Defaults to False.
Returns:
list[:obj:`InstanceData`]: Object detection results of each image
after the post process. Each item usually contains following keys.
- scores_3d (Tensor): Classification scores, has a shape
(num_instance, )
- labels_3d (Tensor): Labels of bboxes, has a shape
(num_instances, ).
- bboxes_3d (Tensor): Contains a tensor with shape
(num_instances, C), where C >= 7.
"""
# sinθ & cosθ ---> θ
preds_dicts = self.bbox_coder.decode(preds_dicts)
num_samples = len(preds_dicts) # batch size
ret_list = []
for i in range(num_samples):
results = InstanceData()
preds = preds_dicts[i]
bboxes = preds['bboxes']
bboxes[:, 2] = bboxes[:, 2] - bboxes[:, 5] * 0.5
bboxes = img_metas[i]['box_type_3d'](bboxes, self.code_size - 1)
results.bboxes_3d = bboxes
results.scores_3d = preds['scores']
results.labels_3d = preds['labels']
ret_list.append(results)
return ret_list
import warnings
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from mmcv.cnn.bricks.transformer import (TransformerLayerSequence,
build_transformer_layer_sequence)
from mmcv.ops.multi_scale_deform_attn import MultiScaleDeformableAttention
from mmengine.model import BaseModule, constant_init, xavier_init
from mmdet3d.registry import MODELS
def inverse_sigmoid(x, eps=1e-5):
"""Inverse function of sigmoid.
Args:
x (Tensor): The tensor to do the
inverse.
eps (float): EPS avoid numerical
overflow. Defaults 1e-5.
Returns:
Tensor: The x has passed the inverse
function of sigmoid, has same
shape with input.
"""
x = x.clamp(min=0, max=1)
x1 = x.clamp(min=eps)
x2 = (1 - x).clamp(min=eps)
return torch.log(x1 / x2)
@MODELS.register_module()
class Detr3DTransformer(BaseModule):
"""Implements the DETR3D transformer.
Args:
as_two_stage (bool): Generate query from encoder features.
Default: False.
num_feature_levels (int): Number of feature maps from FPN:
Default: 4.
num_cams (int): Number of cameras in the dataset.
Default: 6 in NuScenes Det.
two_stage_num_proposals (int): Number of proposals when set
`as_two_stage` as True. Default: 300.
"""
def __init__(self,
num_feature_levels=4,
num_cams=6,
two_stage_num_proposals=300,
decoder=None,
**kwargs):
super(Detr3DTransformer, self).__init__(**kwargs)
self.decoder = build_transformer_layer_sequence(decoder)
self.embed_dims = self.decoder.embed_dims
self.num_feature_levels = num_feature_levels
self.num_cams = num_cams
self.two_stage_num_proposals = two_stage_num_proposals
self.init_layers()
def init_layers(self):
"""Initialize layers of the Detr3DTransformer."""
self.reference_points = nn.Linear(self.embed_dims, 3)
def init_weights(self):
"""Initialize the transformer weights."""
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
for m in self.modules():
if isinstance(m, MultiScaleDeformableAttention) or isinstance(
m, Detr3DCrossAtten):
m.init_weight()
xavier_init(self.reference_points, distribution='uniform', bias=0.)
def forward(self, mlvl_feats, query_embed, reg_branches=None, **kwargs):
"""Forward function for `Detr3DTransformer`.
Args:
mlvl_feats (list(Tensor)): Input queries from
different level. Each element has shape
(B, N, C, H_lvl, W_lvl).
query_embed (Tensor): The query positional and semantic embedding
for decoder, with shape [num_query, c+c].
mlvl_pos_embeds (list(Tensor)): The positional encoding
of feats from different level, has the shape
[bs, N, embed_dims, h, w]. It is unused here.
reg_branches (obj:`nn.ModuleList`): Regression heads for
feature maps from each decoder layer. Only would
be passed when `with_box_refine` is True. Default to None.
Returns:
tuple[Tensor]: results of decoder containing the following tensor.
- inter_states: Outputs from decoder. If
return_intermediate_dec is True output has shape
(num_dec_layers, bs, num_query, embed_dims), else has
shape (1, bs, num_query, embed_dims).
- init_reference_out: The initial value of reference
points, has shape (bs, num_queries, 4).
- inter_references_out: The internal value of reference
points in decoder, has shape
(num_dec_layers, bs, num_query, embed_dims)
"""
assert query_embed is not None
bs = mlvl_feats[0].size(0)
query_pos, query = torch.split(query_embed, self.embed_dims, dim=1)
query_pos = query_pos.unsqueeze(0).expand(bs, -1, -1) # [bs,num_q,c]
query = query.unsqueeze(0).expand(bs, -1, -1) # [bs,num_q,c]
reference_points = self.reference_points(query_pos)
reference_points = reference_points.sigmoid()
init_reference_out = reference_points
# decoder
query = query.permute(1, 0, 2)
query_pos = query_pos.permute(1, 0, 2)
inter_states, inter_references = self.decoder(
query=query,
key=None,
value=mlvl_feats,
query_pos=query_pos,
reference_points=reference_points,
reg_branches=reg_branches,
**kwargs)
inter_references_out = inter_references
return inter_states, init_reference_out, inter_references_out
@MODELS.register_module()
class Detr3DTransformerDecoder(TransformerLayerSequence):
"""Implements the decoder in DETR3D transformer.
Args:
return_intermediate (bool): Whether to return intermediate outputs.
coder_norm_cfg (dict): Config of last normalization layer. Default:
`LN`.
"""
def __init__(self, *args, return_intermediate=False, **kwargs):
super(Detr3DTransformerDecoder, self).__init__(*args, **kwargs)
self.return_intermediate = return_intermediate
def forward(self,
query,
*args,
reference_points=None,
reg_branches=None,
**kwargs):
"""Forward function for `Detr3DTransformerDecoder`.
Args:
query (Tensor): Input query with shape
`(num_query, bs, embed_dims)`.
reference_points (Tensor): The reference
points of offset. has shape
(bs, num_query, 4) when as_two_stage,
otherwise has shape self.reference_points =
nn.Linear(self.embed_dims, 3)
reg_branch: (obj:`nn.ModuleList`): Used for
refining the regression results. Only would
be passed when with_box_refine is True,
otherwise would be passed a `None`.
Returns:
Tensor: Results with shape [1, num_query, bs, embed_dims] when
return_intermediate is `False`, otherwise it has shape
[num_layers, num_query, bs, embed_dims].
"""
output = query
intermediate = []
intermediate_reference_points = []
for lid, layer in enumerate(self.layers): # iterative refinement
reference_points_input = reference_points
output = layer(
output,
*args,
reference_points=reference_points_input,
**kwargs)
output = output.permute(1, 0, 2)
if reg_branches is not None:
tmp = reg_branches[lid](output)
assert reference_points.shape[-1] == 3
new_reference_points = torch.zeros_like(reference_points)
new_reference_points[..., :2] = tmp[..., :2] + inverse_sigmoid(
reference_points[..., :2])
new_reference_points[...,
2:3] = tmp[..., 4:5] + inverse_sigmoid(
reference_points[..., 2:3])
new_reference_points = new_reference_points.sigmoid()
reference_points = new_reference_points.detach()
output = output.permute(1, 0, 2)
if self.return_intermediate:
intermediate.append(output)
intermediate_reference_points.append(reference_points)
if self.return_intermediate:
return torch.stack(intermediate), torch.stack(
intermediate_reference_points)
return output, reference_points
@MODELS.register_module()
class Detr3DCrossAtten(BaseModule):
"""An attention module used in Detr3d.
Args:
embed_dims (int): The embedding dimension of Attention.
Default: 256.
num_heads (int): Parallel attention heads. Default: 64.
num_levels (int): The number of feature map used in
Attention. Default: 4.
num_points (int): The number of sampling points for
each query in each head. Default: 4.
im2col_step (int): The step used in image_to_column.
Default: 64.
dropout (float): A Dropout layer on `inp_residual`.
Default: 0..
init_cfg (obj:`mmcv.ConfigDict`): The Config for initialization.
Default: None.
"""
def __init__(
self,
embed_dims=256,
num_heads=8,
num_levels=4,
num_points=5,
num_cams=6,
im2col_step=64,
pc_range=None,
dropout=0.1,
norm_cfg=None,
init_cfg=None,
batch_first=False,
):
super(Detr3DCrossAtten, self).__init__(init_cfg)
if embed_dims % num_heads != 0:
raise ValueError(f'embed_dims must be divisible by num_heads, '
f'but got {embed_dims} and {num_heads}')
dim_per_head = embed_dims // num_heads
self.norm_cfg = norm_cfg
self.init_cfg = init_cfg
self.dropout = nn.Dropout(dropout)
self.pc_range = pc_range
# you'd better set dim_per_head to a power of 2
# which is more efficient in the CUDA implementation
def _is_power_of_2(n):
if (not isinstance(n, int)) or (n < 0):
raise ValueError(
'invalid input for _is_power_of_2: {} (type: {})'.format(
n, type(n)))
return (n & (n - 1) == 0) and n != 0
if not _is_power_of_2(dim_per_head):
warnings.warn(
"You'd better set embed_dims in "
'MultiScaleDeformAttention to make '
'the dimension of each attention head a power of 2 '
'which is more efficient in our CUDA implementation.')
self.im2col_step = im2col_step
self.embed_dims = embed_dims
self.num_levels = num_levels
self.num_heads = num_heads
self.num_points = num_points
self.num_cams = num_cams
self.attention_weights = nn.Linear(embed_dims,
num_cams * num_levels * num_points)
self.output_proj = nn.Linear(embed_dims, embed_dims)
self.position_encoder = nn.Sequential(
nn.Linear(3, self.embed_dims),
nn.LayerNorm(self.embed_dims),
nn.ReLU(inplace=True),
nn.Linear(self.embed_dims, self.embed_dims),
nn.LayerNorm(self.embed_dims),
nn.ReLU(inplace=True),
)
self.batch_first = batch_first
self.init_weight()
def init_weight(self):
"""Default initialization for Parameters of Module."""
constant_init(self.attention_weights, val=0., bias=0.)
xavier_init(self.output_proj, distribution='uniform', bias=0.)
def forward(self,
query,
key,
value,
residual=None,
query_pos=None,
reference_points=None,
**kwargs):
"""Forward Function of Detr3DCrossAtten.
Args:
query (Tensor): Query of Transformer with shape
(num_query, bs, embed_dims).
key (Tensor): The key tensor with shape
`(num_key, bs, embed_dims)`.
value (List[Tensor]): Image features from
different level. Each element has shape
(B, N, C, H_lvl, W_lvl).
residual (Tensor): The tensor used for addition, with the
same shape as `x`. Default None. If None, `x` will be used.
query_pos (Tensor): The positional encoding for `query`.
Default: None.
reference_points (Tensor): The normalized 3D reference
points with shape (bs, num_query, 3)
Returns:
Tensor: forwarded results with shape [num_query, bs, embed_dims].
"""
if key is None:
key = query
if value is None:
value = key
if residual is None:
inp_residual = query
if query_pos is not None:
query = query + query_pos
query = query.permute(1, 0, 2)
bs, num_query, _ = query.size()
attention_weights = self.attention_weights(query).view(
bs, 1, num_query, self.num_cams, self.num_points, self.num_levels)
reference_points_3d, output, mask = feature_sampling(
value, reference_points, self.pc_range, kwargs['img_metas'])
output = torch.nan_to_num(output)
mask = torch.nan_to_num(mask)
attention_weights = attention_weights.sigmoid() * mask
output = output * attention_weights
output = output.sum(-1).sum(-1).sum(-1)
output = output.permute(2, 0, 1)
# (num_query, bs, embed_dims)
output = self.output_proj(output)
pos_feat = self.position_encoder(
inverse_sigmoid(reference_points_3d)).permute(1, 0, 2)
return self.dropout(output) + inp_residual + pos_feat
def feature_sampling(mlvl_feats,
ref_pt,
pc_range,
img_metas,
no_sampling=False):
""" sample multi-level features by projecting 3D reference points
to 2D image
Args:
mlvl_feats (List[Tensor]): Image features from
different level. Each element has shape
(B, N, C, H_lvl, W_lvl).
ref_pt (Tensor): The normalized 3D reference
points with shape (bs, num_query, 3)
pc_range: perception range of the detector
img_metas (list[dict]): Meta information of multiple inputs
in a batch, containing `lidar2img`.
no_sampling (bool): If set 'True', the function will return
2D projected points and mask only.
Returns:
ref_pt_3d (Tensor): A copy of original ref_pt
sampled_feats (Tensor): sampled features with shape \
(B C num_q N 1 fpn_lvl)
mask (Tensor): Determine whether the reference point \
has projected outsied of images, with shape \
(B 1 num_q N 1 1)
"""
lidar2img = [meta['lidar2img'] for meta in img_metas]
lidar2img = np.asarray(lidar2img)
lidar2img = ref_pt.new_tensor(lidar2img)
ref_pt = ref_pt.clone()
ref_pt_3d = ref_pt.clone()
B, num_query = ref_pt.size()[:2]
num_cam = lidar2img.size(1)
eps = 1e-5
ref_pt[..., 0:1] = \
ref_pt[..., 0:1] * (pc_range[3] - pc_range[0]) + pc_range[0] # x
ref_pt[..., 1:2] = \
ref_pt[..., 1:2] * (pc_range[4] - pc_range[1]) + pc_range[1] # y
ref_pt[..., 2:3] = \
ref_pt[..., 2:3] * (pc_range[5] - pc_range[2]) + pc_range[2] # z
# (B num_q 3) -> (B num_q 4) -> (B 1 num_q 4) -> (B num_cam num_q 4 1)
ref_pt = torch.cat((ref_pt, torch.ones_like(ref_pt[..., :1])), -1)
ref_pt = ref_pt.view(B, 1, num_query, 4)
ref_pt = ref_pt.repeat(1, num_cam, 1, 1).unsqueeze(-1)
# (B num_cam 4 4) -> (B num_cam num_q 4 4)
lidar2img = lidar2img.view(B, num_cam, 1, 4, 4)\
.repeat(1, 1, num_query, 1, 1)
# (... 4 4) * (... 4 1) -> (B num_cam num_q 4)
pt_cam = torch.matmul(lidar2img, ref_pt).squeeze(-1)
# (B num_cam num_q)
z = pt_cam[..., 2:3]
eps = eps * torch.ones_like(z)
mask = (z > eps)
pt_cam = pt_cam[..., 0:2] / torch.maximum(z, eps) # prevent zero-division
# padded nuscene image: 928*1600
(h, w) = img_metas[0]['pad_shape']
pt_cam[..., 0] /= w
pt_cam[..., 1] /= h
# else:
# (h,w,_) = img_metas[0]['ori_shape'][0] # waymo image
# pt_cam[..., 0] /= w # cam0~2: 1280*1920
# pt_cam[..., 1] /= h # cam3~4: 886 *1920 padded to 1280*1920
# mask[:, 3:5, :] &= (pt_cam[:, 3:5, :, 1:2] < 0.7) # filter pt_cam_y > 886
mask = (
mask & (pt_cam[..., 0:1] > 0.0)
& (pt_cam[..., 0:1] < 1.0)
& (pt_cam[..., 1:2] > 0.0)
& (pt_cam[..., 1:2] < 1.0))
if no_sampling:
return pt_cam, mask
# (B num_cam num_q) -> (B 1 num_q num_cam 1 1)
mask = mask.view(B, num_cam, 1, num_query, 1, 1).permute(0, 2, 3, 1, 4, 5)
mask = torch.nan_to_num(mask)
pt_cam = (pt_cam - 0.5) * 2 # [0,1] to [-1,1] to do grid_sample
sampled_feats = []
for lvl, feat in enumerate(mlvl_feats):
B, N, C, H, W = feat.size()
feat = feat.view(B * N, C, H, W)
pt_cam_lvl = pt_cam.view(B * N, num_query, 1, 2)
sampled_feat = F.grid_sample(feat, pt_cam_lvl)
# (B num_cam C num_query 1) -> List of (B C num_q num_cam 1)
sampled_feat = sampled_feat.view(B, N, C, num_query, 1)
sampled_feat = sampled_feat.permute(0, 2, 3, 1, 4)
sampled_feats.append(sampled_feat)
sampled_feats = torch.stack(sampled_feats, -1)
# (B C num_q num_cam fpn_lvl)
sampled_feats = \
sampled_feats.view(B, C, num_query, num_cam, 1, len(mlvl_feats))
return ref_pt_3d, sampled_feats, mask
import numpy as np
import torch
import torch.nn as nn
from PIL import Image
class Grid(object):
def __init__(self,
use_h,
use_w,
rotate=1,
offset=False,
ratio=0.5,
mode=0,
prob=1.):
self.use_h = use_h
self.use_w = use_w
self.rotate = rotate
self.offset = offset
self.ratio = ratio
self.mode = mode
self.st_prob = prob
self.prob = prob
def set_prob(self, epoch, max_epoch):
self.prob = self.st_prob * epoch / max_epoch
def __call__(self, img, label):
if np.random.rand() > self.prob:
return img, label
h = img.size(1)
w = img.size(2)
self.d1 = 2
self.d2 = min(h, w)
hh = int(1.5 * h)
ww = int(1.5 * w)
d = np.random.randint(self.d1, self.d2)
if self.ratio == 1:
self.L = np.random.randint(1, d)
else:
self.L = min(max(int(d * self.ratio + 0.5), 1), d - 1)
mask = np.ones((hh, ww), np.float32)
st_h = np.random.randint(d)
st_w = np.random.randint(d)
if self.use_h:
for i in range(hh // d):
s = d * i + st_h
t = min(s + self.L, hh)
mask[s:t, :] *= 0
if self.use_w:
for i in range(ww // d):
s = d * i + st_w
t = min(s + self.L, ww)
mask[:, s:t] *= 0
r = np.random.randint(self.rotate)
mask = Image.fromarray(np.uint8(mask))
mask = mask.rotate(r)
mask = np.asarray(mask)
mask = mask[(hh - h) // 2:(hh - h) // 2 + h,
(ww - w) // 2:(ww - w) // 2 + w]
mask = torch.from_numpy(mask).float()
if self.mode == 1:
mask = 1 - mask
mask = mask.expand_as(img)
if self.offset:
offset = torch.from_numpy(2 * (np.random.rand(h, w) - 0.5)).float()
offset = (1 - mask) * offset
img = img * mask + offset
else:
img = img * mask
return img, label
class GridMask(nn.Module):
def __init__(self,
use_h,
use_w,
rotate=1,
offset=False,
ratio=0.5,
mode=0,
prob=1.):
super(GridMask, self).__init__()
self.use_h = use_h
self.use_w = use_w
self.rotate = rotate
self.offset = offset
self.ratio = ratio
self.mode = mode
self.st_prob = prob
self.prob = prob
def set_prob(self, epoch, max_epoch):
self.prob = self.st_prob * epoch / max_epoch # + 1.# 0.5
def forward(self, x):
if np.random.rand() > self.prob or not self.training:
return x
n, c, h, w = x.size()
x = x.view(-1, h, w)
hh = int(1.5 * h)
ww = int(1.5 * w)
d = np.random.randint(2, h)
self.L = min(max(int(d * self.ratio + 0.5), 1), d - 1)
mask = np.ones((hh, ww), np.float32)
st_h = np.random.randint(d)
st_w = np.random.randint(d)
if self.use_h:
for i in range(hh // d):
s = d * i + st_h
t = min(s + self.L, hh)
mask[s:t, :] *= 0
if self.use_w:
for i in range(ww // d):
s = d * i + st_w
t = min(s + self.L, ww)
mask[:, s:t] *= 0
r = np.random.randint(self.rotate)
mask = Image.fromarray(np.uint8(mask))
mask = mask.rotate(r)
mask = np.asarray(mask)
mask = mask[(hh - h) // 2:(hh - h) // 2 + h,
(ww - w) // 2:(ww - w) // 2 + w]
mask = torch.from_numpy(mask).to(x)
if self.mode == 1:
mask = 1 - mask
mask = mask.expand_as(x)
if self.offset:
offset = torch.from_numpy(2 * (np.random.rand(h, w) - 0.5)).to(x)
x = x * mask + offset * (1 - mask)
else:
x = x * mask
return x.view(n, c, h, w)
from typing import List
import torch
from mmdet.models.task_modules.assigners import AssignResult # check
from mmdet.models.task_modules.assigners import BaseAssigner
from mmengine.structures import InstanceData
from torch import Tensor
from mmdet3d.registry import TASK_UTILS
from .util import normalize_bbox
try:
from scipy.optimize import linear_sum_assignment
except ImportError:
linear_sum_assignment = None
@TASK_UTILS.register_module()
class HungarianAssigner3D(BaseAssigner):
"""Computes one-to-one matching between predictions and ground truth.
This class computes an assignment between the targets and the predictions
based on the costs. The costs are weighted sum of some components.
For DETR3D the costs are weighted sum of classification cost, regression L1
cost and regression iou cost. The targets don't include the no_object, so
generally there are more predictions than targets. After the one-to-one
matching, the un-matched are treated as backgrounds. Thus each query
prediction will be assigned with `0` or a positive integer indicating the
ground truth index:
- 0: negative sample, no assigned gt
- positive integer: positive sample, index (1-based) of assigned gt
Args:
cls_cost (obj:`ConfigDict`) : Match cost configs.
reg_cost.
iou_cost.
pc_range: perception range of the detector
"""
def __init__(self,
cls_cost=dict(type='ClassificationCost', weight=1.),
reg_cost=dict(type='BBoxL1Cost', weight=1.0),
iou_cost=dict(type='IoUCost', weight=0.0),
pc_range: List = None):
self.cls_cost = TASK_UTILS.build(cls_cost)
self.reg_cost = TASK_UTILS.build(reg_cost)
self.iou_cost = TASK_UTILS.build(iou_cost)
self.pc_range = pc_range
def assign(self,
bbox_pred: Tensor,
cls_pred: Tensor,
gt_bboxes: Tensor,
gt_labels: Tensor,
gt_bboxes_ignore=None,
eps=1e-7) -> AssignResult:
"""Computes one-to-one matching based on the weighted costs.
This method assign each query prediction to a ground truth or
background. The `assigned_gt_inds` with -1 means don't care,
0 means negative sample, and positive number is the index (1-based)
of assigned gt.
The assignment is done in the following steps, the order matters.
1. assign every prediction to -1
2. compute the weighted costs
3. do Hungarian matching on CPU based on the costs
4. assign all to 0 (background) first, then for each matched pair
between predictions and gts, treat this prediction as foreground
and assign the corresponding gt index (plus 1) to it.
Args:
bbox_pred (Tensor): Predicted boxes with normalized coordinates
(cx,cy,l,w,cz,h,sin(φ),cos(φ),v_x,v_y) which are all in
range [0, 1] and shape [num_query, 10].
cls_pred (Tensor): Predicted classification logits, shape
[num_query, num_class].
gt_bboxes (Tensor): Ground truth boxes with unnormalized
coordinates (cx,cy,cz,l,w,h,φ,v_x,v_y). Shape [num_gt, 9].
gt_labels (Tensor): Label of `gt_bboxes`, shape (num_gt,).
gt_bboxes_ignore (Tensor, optional): Ground truth bboxes that are
labelled as `ignored`. Default None.
eps (int | float, optional): unused parameter
Returns:
:obj:`AssignResult`: The assigned result.
"""
assert gt_bboxes_ignore is None, \
'Only case when gt_bboxes_ignore is None is supported.'
num_gts, num_bboxes = gt_bboxes.size(0), bbox_pred.size(0) # 9, 900
# 1. assign -1 by default
assigned_gt_inds = bbox_pred.new_full((num_bboxes, ),
-1,
dtype=torch.long)
assigned_labels = bbox_pred.new_full((num_bboxes, ),
-1,
dtype=torch.long)
if num_gts == 0 or num_bboxes == 0:
# No ground truth or boxes, return empty assignment
if num_gts == 0:
# No ground truth, assign all to background
assigned_gt_inds[:] = 0
return AssignResult(
num_gts, assigned_gt_inds, None, labels=assigned_labels)
# 2. compute the weighted costs
# classification and bboxcost.
# # dev1.x interface alignment
pred_instances = InstanceData(scores=cls_pred)
gt_instances = InstanceData(labels=gt_labels)
cls_cost = self.cls_cost(pred_instances, gt_instances)
# regression L1 cost
normalized_gt_bboxes = normalize_bbox(gt_bboxes, self.pc_range)
reg_cost = self.reg_cost(bbox_pred[:, :8], normalized_gt_bboxes[:, :8])
# weighted sum of above two costs
cost = cls_cost + reg_cost
# 3. do Hungarian matching on CPU using linear_sum_assignment
cost = cost.detach().cpu()
if linear_sum_assignment is None:
raise ImportError('Please run "pip install scipy" '
'to install scipy first.')
matched_row_inds, matched_col_inds = linear_sum_assignment(cost)
matched_row_inds = torch.from_numpy(matched_row_inds).to(
bbox_pred.device)
matched_col_inds = torch.from_numpy(matched_col_inds).to(
bbox_pred.device)
# 4. assign backgrounds and foregrounds
# assign all indices to backgrounds first
assigned_gt_inds[:] = 0
# assign foregrounds based on matching results
assigned_gt_inds[matched_row_inds] = matched_col_inds + 1
assigned_labels[matched_row_inds] = gt_labels[matched_col_inds]
return AssignResult(
num_gts, assigned_gt_inds, None, labels=assigned_labels)
from typing import Union
import torch
from torch import Tensor
from mmdet3d.registry import TASK_UTILS
@TASK_UTILS.register_module()
class BBox3DL1Cost(object):
"""BBox3DL1Cost.
Args:
weight (Union[float, int]): Cost weight. Defaults to 1.
"""
def __init__(self, weight: Union[float, int] = 1.):
self.weight = weight
def __call__(self, bbox_pred: Tensor, gt_bboxes: Tensor) -> Tensor:
"""Compute match cost.
Args:
bbox_pred (Tensor): Predicted boxes with normalized coordinates
(cx,cy,l,w,cz,h,sin(φ),cos(φ),v_x,v_y)
which are all in range [0, 1] and shape [num_query, 10].
gt_bboxes (Tensor): Ground truth boxes with `normalized`
coordinates (cx,cy,l,w,cz,h,sin(φ),cos(φ),v_x,v_y).
Shape [num_gt, 10].
Returns:
Tensor: Match Cost matrix of shape (num_preds, num_gts).
"""
bbox_cost = torch.cdist(bbox_pred, gt_bboxes, p=1)
return bbox_cost * self.weight
import torch
from mmdet.models.task_modules import BaseBBoxCoder
from mmdet3d.registry import TASK_UTILS
from .util import denormalize_bbox
@TASK_UTILS.register_module()
class NMSFreeCoder(BaseBBoxCoder):
"""Bbox coder for NMS-free detector.
Args:
pc_range (list[float]): Range of point cloud.
post_center_range (list[float]): Limit of the center.
Default: None.
max_num (int): Max number to be kept. Default: 100.
score_threshold (float): Threshold to filter boxes based on score.
Default: None.
"""
def __init__(self,
pc_range=None,
voxel_size=None,
post_center_range=None,
max_num=100,
score_threshold=None,
num_classes=10):
self.pc_range = pc_range
self.voxel_size = voxel_size
self.post_center_range = post_center_range
self.max_num = max_num
self.score_threshold = score_threshold
self.num_classes = num_classes
def encode(self):
pass
def decode_single(self, cls_scores, bbox_preds):
"""Decode bboxes.
Args:
cls_scores (Tensor): Outputs from the classification head,
shape [num_query, cls_out_channels]. Note that
cls_out_channels should includes background.
bbox_preds (Tensor): Outputs from the regression
head with normalized coordinate
(cx, cy, l, w, cz, h, rot_sine, rot_cosine, vx, vy).
Shape [num_query, 10].
Returns:
list[dict]: Decoded boxes.
"""
max_num = self.max_num
cls_scores = cls_scores.sigmoid()
scores, indexes = cls_scores.view(-1).topk(max_num)
labels = indexes % self.num_classes
bbox_index = indexes // self.num_classes
bbox_preds = bbox_preds[bbox_index]
# [[cx, cy, cz, l, w, h, rot, vx, vy]]
final_box_preds = denormalize_bbox(bbox_preds, None)
final_scores = scores
final_preds = labels
# use score threshold
if self.score_threshold is not None:
thresh_mask = final_scores > self.score_threshold
if self.post_center_range is not None:
self.post_center_range = torch.tensor(
self.post_center_range, device=scores.device)
mask = (final_box_preds[..., :3] >=
self.post_center_range[:3]).all(1)
mask &= (final_box_preds[..., :3] <=
self.post_center_range[3:]).all(1)
if self.score_threshold:
mask &= thresh_mask
boxes3d = final_box_preds[mask]
scores = final_scores[mask]
labels = final_preds[mask]
predictions_dict = {
'bboxes': boxes3d,
'scores': scores,
'labels': labels
}
else:
raise NotImplementedError(
'Need to reorganize output as a batch, only '
'support post_center_range is not None for now!')
return predictions_dict
def decode(self, preds_dicts):
"""Decode bboxes.
Args:
all_cls_scores (Tensor): Outputs from the classification head,
shape [nb_dec, bs, num_query, cls_out_channels]. Note
cls_out_channels should includes background.
all_bbox_preds (Tensor): Sigmoid outputs from the regression
head with normalized coordinate format
(cx, cy, l, w, cz, h, rot_sine, rot_cosine, vx, vy).
Shape [nb_dec, bs, num_query, 10].
Returns:
list[dict]: Decoded boxes.
"""
# cls & reg target of last decoder layer
all_cls_scores = preds_dicts['all_cls_scores'][-1]
all_bbox_preds = preds_dicts['all_bbox_preds'][-1]
batch_size = all_cls_scores.size()[0]
predictions_list = []
for i in range(batch_size):
predictions_list.append(
self.decode_single(all_cls_scores[i], all_bbox_preds[i]))
return predictions_list
from typing import List
import torch
from torch import Tensor
def normalize_bbox(bboxes: Tensor, pc_range: List) -> Tensor:
""" normalize bboxes
Args:
bboxes (Tensor): boxes with unnormalized
coordinates (cx,cy,cz,L,W,H,φ,v_x,v_y). Shape [num_gt, 9].
pc_range (List): Perception range of the detector
Returns:
normalized_bboxes (Tensor): boxes with normalized coordinate
(cx,cy,L,W,cz,H,sin(φ),cos(φ),v_x,v_y).
All in range [0, 1] and shape [num_query, 10].
"""
cx = bboxes[..., 0:1]
cy = bboxes[..., 1:2]
cz = bboxes[..., 2:3]
L = bboxes[..., 3:4].log()
W = bboxes[..., 4:5].log()
H = bboxes[..., 5:6].log()
rot = bboxes[..., 6:7]
if bboxes.size(-1) > 7:
vx = bboxes[..., 7:8]
vy = bboxes[..., 8:9]
normalized_bboxes = torch.cat(
(cx, cy, L, W, cz, H, rot.sin(), rot.cos(), vx, vy), dim=-1)
else:
normalized_bboxes = torch.cat(
(cx, cy, L, W, cz, H, rot.sin(), rot.cos()), dim=-1)
return normalized_bboxes
def denormalize_bbox(normalized_bboxes, pc_range):
""" denormalize bboxes
Args:
normalized_bboxes (Tensor): boxes with normalized coordinate
(cx,cy,L,W,cz,H,sin(φ),cos(φ),v_x,v_y).
All in range [0, 1] and shape [num_query, 10].
pc_range (List): Perception range of the detector
Returns:
denormalized_bboxes (Tensor): boxes with unnormalized
coordinates (cx,cy,cz,L,W,H,φ,v_x,v_y). Shape [num_gt, 9].
"""
# rotation
rot_sine = normalized_bboxes[..., 6:7]
rot_cosine = normalized_bboxes[..., 7:8]
rot = torch.atan2(rot_sine, rot_cosine)
# center in the bev
cx = normalized_bboxes[..., 0:1]
cy = normalized_bboxes[..., 1:2]
cz = normalized_bboxes[..., 4:5]
# size, the meaning of L,W may alter in different version of mmdet3d
L = normalized_bboxes[..., 2:3]
W = normalized_bboxes[..., 3:4]
H = normalized_bboxes[..., 5:6]
L = L.exp()
W = W.exp()
H = H.exp()
if normalized_bboxes.size(-1) > 8:
# velocity
vx = normalized_bboxes[:, 8:9]
vy = normalized_bboxes[:, 9:10]
denormalized_bboxes = torch.cat([cx, cy, cz, L, W, H, rot, vx, vy],
dim=-1)
else:
denormalized_bboxes = torch.cat([cx, cy, cz, L, W, H, rot], dim=-1)
return denormalized_bboxes
import warnings
from collections import OrderedDict
import torch
import torch.nn as nn
import torch.nn.functional as F
from mmengine.model import BaseModule
from torch.nn.modules.batchnorm import _BatchNorm
from mmdet3d.registry import MODELS
VoVNet19_slim_dw_eSE = {
'stem': [64, 64, 64],
'stage_conv_ch': [64, 80, 96, 112],
'stage_out_ch': [112, 256, 384, 512],
'layer_per_block': 3,
'block_per_stage': [1, 1, 1, 1],
'eSE': True,
'dw': True
}
VoVNet19_dw_eSE = {
'stem': [64, 64, 64],
'stage_conv_ch': [128, 160, 192, 224],
'stage_out_ch': [256, 512, 768, 1024],
'layer_per_block': 3,
'block_per_stage': [1, 1, 1, 1],
'eSE': True,
'dw': True
}
VoVNet19_slim_eSE = {
'stem': [64, 64, 128],
'stage_conv_ch': [64, 80, 96, 112],
'stage_out_ch': [112, 256, 384, 512],
'layer_per_block': 3,
'block_per_stage': [1, 1, 1, 1],
'eSE': True,
'dw': False
}
VoVNet19_eSE = {
'stem': [64, 64, 128],
'stage_conv_ch': [128, 160, 192, 224],
'stage_out_ch': [256, 512, 768, 1024],
'layer_per_block': 3,
'block_per_stage': [1, 1, 1, 1],
'eSE': True,
'dw': False
}
VoVNet39_eSE = {
'stem': [64, 64, 128],
'stage_conv_ch': [128, 160, 192, 224],
'stage_out_ch': [256, 512, 768, 1024],
'layer_per_block': 5,
'block_per_stage': [1, 1, 2, 2],
'eSE': True,
'dw': False
}
VoVNet57_eSE = {
'stem': [64, 64, 128],
'stage_conv_ch': [128, 160, 192, 224],
'stage_out_ch': [256, 512, 768, 1024],
'layer_per_block': 5,
'block_per_stage': [1, 1, 4, 3],
'eSE': True,
'dw': False
}
VoVNet99_eSE = {
'stem': [64, 64, 128],
'stage_conv_ch': [128, 160, 192, 224],
'stage_out_ch': [256, 512, 768, 1024],
'layer_per_block': 5,
'block_per_stage': [1, 3, 9, 3],
'eSE': True,
'dw': False
}
_STAGE_SPECS = {
'V-19-slim-dw-eSE': VoVNet19_slim_dw_eSE,
'V-19-dw-eSE': VoVNet19_dw_eSE,
'V-19-slim-eSE': VoVNet19_slim_eSE,
'V-19-eSE': VoVNet19_eSE,
'V-39-eSE': VoVNet39_eSE,
'V-57-eSE': VoVNet57_eSE,
'V-99-eSE': VoVNet99_eSE,
}
def dw_conv3x3(in_channels,
out_channels,
module_name,
postfix,
stride=1,
kernel_size=3,
padding=1):
"""3x3 convolution with padding."""
return [
('{}_{}/dw_conv3x3'.format(module_name, postfix),
nn.Conv2d(
in_channels,
out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
groups=out_channels,
bias=False)),
('{}_{}/pw_conv1x1'.format(module_name, postfix),
nn.Conv2d(
in_channels,
out_channels,
kernel_size=1,
stride=1,
padding=0,
groups=1,
bias=False)),
('{}_{}/pw_norm'.format(module_name,
postfix), nn.BatchNorm2d(out_channels)),
('{}_{}/pw_relu'.format(module_name, postfix), nn.ReLU(inplace=True)),
]
def conv3x3(in_channels,
out_channels,
module_name,
postfix,
stride=1,
groups=1,
kernel_size=3,
padding=1):
"""3x3 convolution with padding."""
return [
(
f'{module_name}_{postfix}/conv',
nn.Conv2d(
in_channels,
out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
groups=groups,
bias=False,
),
),
(f'{module_name}_{postfix}/norm', nn.BatchNorm2d(out_channels)),
(f'{module_name}_{postfix}/relu', nn.ReLU(inplace=True)),
]
def conv1x1(in_channels,
out_channels,
module_name,
postfix,
stride=1,
groups=1,
kernel_size=1,
padding=0):
"""1x1 convolution with padding."""
return [
(
f'{module_name}_{postfix}/conv',
nn.Conv2d(
in_channels,
out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
groups=groups,
bias=False,
),
),
(f'{module_name}_{postfix}/norm', nn.BatchNorm2d(out_channels)),
(f'{module_name}_{postfix}/relu', nn.ReLU(inplace=True)),
]
class Hsigmoid(nn.Module):
def __init__(self, inplace=True):
super(Hsigmoid, self).__init__()
self.inplace = inplace
def forward(self, x):
return F.relu6(x + 3.0, inplace=self.inplace) / 6.0
class eSEModule(nn.Module):
def __init__(self, channel, reduction=4):
super(eSEModule, self).__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.fc = nn.Conv2d(channel, channel, kernel_size=1, padding=0)
self.hsigmoid = Hsigmoid()
def forward(self, x):
input = x
x = self.avg_pool(x)
x = self.fc(x)
x = self.hsigmoid(x)
return input * x
class _OSA_module(nn.Module):
def __init__(self,
in_ch,
stage_ch,
concat_ch,
layer_per_block,
module_name,
SE=False,
identity=False,
depthwise=False):
super(_OSA_module, self).__init__()
self.identity = identity
self.depthwise = depthwise
self.isReduced = False
self.layers = nn.ModuleList()
in_channel = in_ch
if self.depthwise and in_channel != stage_ch:
self.isReduced = True
self.conv_reduction = nn.Sequential(
OrderedDict(
conv1x1(in_channel, stage_ch,
'{}_reduction'.format(module_name), '0')))
for i in range(layer_per_block):
if self.depthwise:
self.layers.append(
nn.Sequential(
OrderedDict(
dw_conv3x3(stage_ch, stage_ch, module_name, i))))
else:
self.layers.append(
nn.Sequential(
OrderedDict(
conv3x3(in_channel, stage_ch, module_name, i))))
in_channel = stage_ch
# feature aggregation
in_channel = in_ch + layer_per_block * stage_ch
self.concat = nn.Sequential(
OrderedDict(conv1x1(in_channel, concat_ch, module_name, 'concat')))
self.ese = eSEModule(concat_ch)
def forward(self, x):
identity_feat = x
output = []
output.append(x)
if self.depthwise and self.isReduced:
x = self.conv_reduction(x)
for layer in self.layers:
x = layer(x)
output.append(x)
x = torch.cat(output, dim=1)
xt = self.concat(x)
xt = self.ese(xt)
if self.identity:
xt = xt + identity_feat
return xt
class _OSA_stage(nn.Sequential):
def __init__(self,
in_ch,
stage_ch,
concat_ch,
block_per_stage,
layer_per_block,
stage_num,
SE=False,
depthwise=False):
super(_OSA_stage, self).__init__()
if not stage_num == 2:
self.add_module(
'Pooling',
nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True))
if block_per_stage != 1:
SE = False
module_name = f'OSA{stage_num}_1'
self.add_module(
module_name,
_OSA_module(
in_ch,
stage_ch,
concat_ch,
layer_per_block,
module_name,
SE,
depthwise=depthwise))
for i in range(block_per_stage - 1):
if i != block_per_stage - 2: # last block
SE = False
module_name = f'OSA{stage_num}_{i + 2}'
self.add_module(
module_name,
_OSA_module(
concat_ch,
stage_ch,
concat_ch,
layer_per_block,
module_name,
SE,
identity=True,
depthwise=depthwise),
)
@MODELS.register_module()
class VoVNet(BaseModule):
def __init__(self,
spec_name,
input_ch=3,
out_features=None,
frozen_stages=-1,
norm_eval=True,
pretrained=None,
init_cfg=None):
"""
Args:
input_ch(int) : the number of input channel
out_features (list[str]): name of the layers whose outputs should
be returned in forward. Can be anything in "stem", "stage2" ...
"""
super(VoVNet, self).__init__(init_cfg)
self.frozen_stages = frozen_stages
self.norm_eval = norm_eval
if isinstance(pretrained, str):
warnings.warn('DeprecationWarning: pretrained is deprecated, '
'please use "init_cfg" instead')
self.init_cfg = dict(type='Pretrained', checkpoint=pretrained)
stage_specs = _STAGE_SPECS[spec_name]
stem_ch = stage_specs['stem']
config_stage_ch = stage_specs['stage_conv_ch']
config_concat_ch = stage_specs['stage_out_ch']
block_per_stage = stage_specs['block_per_stage']
layer_per_block = stage_specs['layer_per_block']
SE = stage_specs['eSE']
depthwise = stage_specs['dw']
self._out_features = out_features
# Stem module
conv_type = dw_conv3x3 if depthwise else conv3x3
stem = conv3x3(input_ch, stem_ch[0], 'stem', '1', 2)
stem += conv_type(stem_ch[0], stem_ch[1], 'stem', '2', 1)
stem += conv_type(stem_ch[1], stem_ch[2], 'stem', '3', 2)
self.add_module('stem', nn.Sequential((OrderedDict(stem))))
current_stirde = 4
self._out_feature_strides = {
'stem': current_stirde,
'stage2': current_stirde
}
self._out_feature_channels = {'stem': stem_ch[2]}
stem_out_ch = [stem_ch[2]]
in_ch_list = stem_out_ch + config_concat_ch[:-1]
# OSA stages
self.stage_names = []
for i in range(4): # num_stages
name = 'stage%d' % (i + 2) # stage 2 ... stage 5
self.stage_names.append(name)
self.add_module(
name,
_OSA_stage(
in_ch_list[i],
config_stage_ch[i],
config_concat_ch[i],
block_per_stage[i],
layer_per_block,
i + 2,
SE,
depthwise,
),
)
self._out_feature_channels[name] = config_concat_ch[i]
if not i == 0:
self._out_feature_strides[name] = current_stirde = int(
current_stirde * 2)
# initialize weights
# self._initialize_weights()
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight)
def forward(self, x):
outputs = {}
x = self.stem(x)
if 'stem' in self._out_features:
outputs['stem'] = x
for name in self.stage_names:
x = getattr(self, name)(x)
if name in self._out_features:
outputs[name] = x
return outputs
def _freeze_stages(self):
if self.frozen_stages >= 0:
m = getattr(self, 'stem')
m.eval()
for param in m.parameters():
param.requires_grad = False
for i in range(1, self.frozen_stages + 1):
m = getattr(self, f'stage{i+1}')
m.eval()
for param in m.parameters():
param.requires_grad = False
def train(self, mode=True):
"""Convert the model into training mode while keep normalization layer
freezed."""
super(VoVNet, self).train(mode)
self._freeze_stages()
if mode and self.norm_eval:
for m in self.modules():
# trick: eval have effect on BatchNorm only
if isinstance(m, _BatchNorm):
m.eval()
from argparse import ArgumentParser
import torch
parser = ArgumentParser()
parser.add_argument('src', default='old.pth')
parser.add_argument('dst', default='new.pth') # ('training','validation')
parser.add_argument('--code_size', type=int, default='10')
args = parser.parse_args()
model = torch.load(args.src)
code_size = args.code_size
if model['meta'].get('detr3d_convert_tag') is not None:
print('this model has already converted!')
else:
print('converting...')
# (cx, cy, w, l, cz, h, sin(φ), cos(φ), vx, vy)
for key in model['state_dict']:
tsr = model['state_dict'][key]
if 'reg_branches' in key and tsr.shape[0] == code_size:
print(key, ' with ', tsr.shape, 'has changed')
tsr[[2, 3], ...] = tsr[[3, 2], ...]
tsr[[6, 7], ...] = -tsr[[7, 6], ...]
model['meta']['detr3d_convert_tag'] = True
torch.save(model, args.dst)
print('done...')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment