Commit 8a64de5d authored by chenshi3's avatar chenshi3
Browse files

Add support for BEVFusion

parent c5dfdd71
from .convfuser import ConvFuser
__all__ = {
'ConvFuser':ConvFuser
}
\ No newline at end of file
import torch
from torch import nn
class ConvFuser(nn.Module):
def __init__(self,model_cfg) -> None:
super().__init__()
self.model_cfg = model_cfg
in_channel = self.model_cfg.IN_CHANNEL
out_channel = self.model_cfg.OUT_CHANNEL
self.conv = nn.Sequential(
nn.Conv2d(in_channel, out_channel, 3, padding=1, bias=False),
nn.BatchNorm2d(out_channel),
nn.ReLU(True)
)
def forward(self,batch_dict):
"""
Args:
batch_dict:
spatial_features_img (tensor): Bev features from image modality
spatial_features (tensor): Bev features from lidar modality
Returns:
batch_dict:
spatial_features (tensor): Bev features after muli-modal fusion
"""
img_bev = batch_dict['spatial_features_img']
lidar_bev = batch_dict['spatial_features']
cat_bev = torch.cat([img_bev,lidar_bev],dim=1)
mm_bev = self.conv(cat_bev)
batch_dict['spatial_features'] = mm_bev
return batch_dict
\ No newline at end of file
from .swin import SwinTransformer
__all__ = {
'SwinTransformer':SwinTransformer,
}
\ No newline at end of file
from .generalized_lss import GeneralizedLSSFPN
__all__ = {
'GeneralizedLSSFPN':GeneralizedLSSFPN,
}
\ No newline at end of file
import torch
import torch.nn as nn
import torch.nn.functional as F
from ...model_utils.basic_block_2d import BasicBlock2D
class GeneralizedLSSFPN(nn.Module):
"""
This module implements FPN, which creates pyramid features built on top of some input feature maps.
This code is adapted from https://github.com/open-mmlab/mmdetection/blob/main/mmdet/models/necks/fpn.py with minimal modifications.
"""
def __init__(self, model_cfg):
super().__init__()
self.model_cfg = model_cfg
in_channels = self.model_cfg.IN_CHANNELS
out_channels = self.model_cfg.OUT_CHANNELS
num_ins = len(in_channels)
num_outs = self.model_cfg.NUM_OUTS
start_level = self.model_cfg.START_LEVEL
end_level = self.model_cfg.END_LEVEL
self.in_channels = in_channels
if end_level == -1:
self.backbone_end_level = num_ins - 1
else:
self.backbone_end_level = end_level
assert end_level <= len(in_channels)
assert num_outs == end_level - start_level
self.start_level = start_level
self.end_level = end_level
self.lateral_convs = nn.ModuleList()
self.fpn_convs = nn.ModuleList()
for i in range(self.start_level, self.backbone_end_level):
l_conv = BasicBlock2D(
in_channels[i] + (in_channels[i + 1] if i == self.backbone_end_level - 1 else out_channels),
out_channels, kernel_size=1, bias = False
)
fpn_conv = BasicBlock2D(out_channels,out_channels, kernel_size=3, padding=1, bias = False)
self.lateral_convs.append(l_conv)
self.fpn_convs.append(fpn_conv)
def forward(self, batch_dict):
"""
Args:
batch_dict:
image_features (list[tensor]): Multi-stage features from image backbone.
Returns:
batch_dict:
image_fpn (list(tensor)): FPN features.
"""
# upsample -> cat -> conv1x1 -> conv3x3
inputs = batch_dict['image_features']
assert len(inputs) == len(self.in_channels)
# build laterals
laterals = [inputs[i + self.start_level] for i in range(len(inputs))]
# build top-down path
used_backbone_levels = len(laterals) - 1
for i in range(used_backbone_levels - 1, -1, -1):
x = F.interpolate(
laterals[i + 1],
size=laterals[i].shape[2:],
mode='bilinear', align_corners=False,
)
laterals[i] = torch.cat([laterals[i], x], dim=1)
laterals[i] = self.lateral_convs[i](laterals[i])
laterals[i] = self.fpn_convs[i](laterals[i])
# build outputs
outs = [laterals[i] for i in range(used_backbone_levels)]
batch_dict['image_fpn'] = tuple(outs)
return batch_dict
# Copyright (c) OpenMMLab. All rights reserved.
"""
Mostly copy-paste from
https://github.com/open-mmlab/mmdetection/blob/main/mmdet/models/backbones/swin.py
"""
import warnings
from collections import OrderedDict
from copy import deepcopy
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.checkpoint as cp
from ..model_utils.swin_utils import swin_converter
from ..model_utils.swin_utils import PatchEmbed, PatchMerging
from ..model_utils.swin_utils import FFN, DropPath, to_2tuple, trunc_normal_, trunc_normal_init, constant_init
class WindowMSA(nn.Module):
"""Window based multi-head self-attention (W-MSA) module with relative
position bias.
Args:
embed_dims (int): Number of input channels.
num_heads (int): Number of attention heads.
window_size (tuple[int]): The height and width of the window.
qkv_bias (bool, optional): If True, add a learnable bias to q, k, v.
Default: True.
qk_scale (float | None, optional): Override default qk scale of
head_dim ** -0.5 if set. Default: None.
attn_drop_rate (float, optional): Dropout ratio of attention weight.
Default: 0.0
proj_drop_rate (float, optional): Dropout ratio of output. Default: 0.
"""
def __init__(self,
embed_dims,
num_heads,
window_size,
qkv_bias=True,
qk_scale=None,
attn_drop_rate=0.,
proj_drop_rate=0.):
super().__init__()
self._is_init = False
self.embed_dims = embed_dims
self.window_size = window_size # Wh, Ww
self.num_heads = num_heads
head_embed_dims = embed_dims // num_heads
self.scale = qk_scale or head_embed_dims**-0.5
# define a parameter table of relative position bias
self.relative_position_bias_table = nn.Parameter(
torch.zeros((2 * window_size[0] - 1) * (2 * window_size[1] - 1),
num_heads)) # 2*Wh-1 * 2*Ww-1, nH
# About 2x faster than original impl
Wh, Ww = self.window_size
rel_index_coords = self.double_step_seq(2 * Ww - 1, Wh, 1, Ww)
rel_position_index = rel_index_coords + rel_index_coords.T
rel_position_index = rel_position_index.flip(1).contiguous()
self.register_buffer('relative_position_index', rel_position_index)
self.qkv = nn.Linear(embed_dims, embed_dims * 3, bias=qkv_bias)
self.attn_drop = nn.Dropout(attn_drop_rate)
self.proj = nn.Linear(embed_dims, embed_dims)
self.proj_drop = nn.Dropout(proj_drop_rate)
self.softmax = nn.Softmax(dim=-1)
def init_weights(self):
trunc_normal_(self.relative_position_bias_table, std=0.02)
def forward(self, x, mask=None):
"""
Args:
x (tensor): input features with shape of (num_windows*B, N, C)
mask (tensor | None, Optional): mask with shape of (num_windows,
Wh*Ww, Wh*Ww), value should be between (-inf, 0].
"""
B, N, C = x.shape
qkv = self.qkv(x).reshape(B, N, 3, self.num_heads,
C // self.num_heads).permute(2, 0, 3, 1, 4)
# make torchscript happy (cannot use tensor as tuple)
q, k, v = qkv[0], qkv[1], qkv[2]
q = q * self.scale
attn = (q @ k.transpose(-2, -1))
relative_position_bias = self.relative_position_bias_table[
self.relative_position_index.view(-1)].view(
self.window_size[0] * self.window_size[1],
self.window_size[0] * self.window_size[1],
-1) # Wh*Ww,Wh*Ww,nH
relative_position_bias = relative_position_bias.permute(
2, 0, 1).contiguous() # nH, Wh*Ww, Wh*Ww
attn = attn + relative_position_bias.unsqueeze(0)
if mask is not None:
nW = mask.shape[0]
attn = attn.view(B // nW, nW, self.num_heads, N,
N) + mask.unsqueeze(1).unsqueeze(0)
attn = attn.view(-1, self.num_heads, N, N)
attn = self.softmax(attn)
attn = self.attn_drop(attn)
x = (attn @ v).transpose(1, 2).reshape(B, N, C)
x = self.proj(x)
x = self.proj_drop(x)
return x
@staticmethod
def double_step_seq(step1, len1, step2, len2):
seq1 = torch.arange(0, step1 * len1, step1)
seq2 = torch.arange(0, step2 * len2, step2)
return (seq1[:, None] + seq2[None, :]).reshape(1, -1)
class ShiftWindowMSA(nn.Module):
"""Shifted Window Multihead Self-Attention Module.
Args:
embed_dims (int): Number of input channels.
num_heads (int): Number of attention heads.
window_size (int): The height and width of the window.
shift_size (int, optional): The shift step of each window towards
right-bottom. If zero, act as regular window-msa. Defaults to 0.
qkv_bias (bool, optional): If True, add a learnable bias to q, k, v.
Default: True
qk_scale (float | None, optional): Override default qk scale of
head_dim ** -0.5 if set. Defaults: None.
attn_drop_rate (float, optional): Dropout ratio of attention weight.
Defaults: 0.
proj_drop_rate (float, optional): Dropout ratio of output.
Defaults: 0.
dropout_layer (dict, optional): The dropout_layer used before output.
Defaults: dict(type='DropPath', drop_prob=0.).
"""
def __init__(self,
embed_dims,
num_heads,
window_size,
shift_size=0,
qkv_bias=True,
qk_scale=None,
attn_drop_rate=0,
proj_drop_rate=0,
dropout_layer=dict(type='DropPath', drop_prob=0.)):
super().__init__()
self._is_init = False
self.window_size = window_size
self.shift_size = shift_size
assert 0 <= self.shift_size < self.window_size
self.w_msa = WindowMSA(
embed_dims=embed_dims,
num_heads=num_heads,
window_size=to_2tuple(window_size),
qkv_bias=qkv_bias,
qk_scale=qk_scale,
attn_drop_rate=attn_drop_rate,
proj_drop_rate=proj_drop_rate,)
self.drop = DropPath(dropout_layer['drop_prob'])
def forward(self, query, hw_shape):
B, L, C = query.shape
H, W = hw_shape
assert L == H * W, 'input feature has wrong size'
query = query.view(B, H, W, C)
# pad feature maps to multiples of window size
pad_r = (self.window_size - W % self.window_size) % self.window_size
pad_b = (self.window_size - H % self.window_size) % self.window_size
query = F.pad(query, (0, 0, 0, pad_r, 0, pad_b))
H_pad, W_pad = query.shape[1], query.shape[2]
# cyclic shift
if self.shift_size > 0:
shifted_query = torch.roll(
query,
shifts=(-self.shift_size, -self.shift_size),
dims=(1, 2))
# calculate attention mask for SW-MSA
img_mask = torch.zeros((1, H_pad, W_pad, 1), device=query.device)
h_slices = (slice(0, -self.window_size),
slice(-self.window_size,
-self.shift_size), slice(-self.shift_size, None))
w_slices = (slice(0, -self.window_size),
slice(-self.window_size,
-self.shift_size), slice(-self.shift_size, None))
cnt = 0
for h in h_slices:
for w in w_slices:
img_mask[:, h, w, :] = cnt
cnt += 1
# nW, window_size, window_size, 1
mask_windows = self.window_partition(img_mask)
mask_windows = mask_windows.view(
-1, self.window_size * self.window_size)
attn_mask = mask_windows.unsqueeze(1) - mask_windows.unsqueeze(2)
attn_mask = attn_mask.masked_fill(attn_mask != 0,
float(-100.0)).masked_fill(
attn_mask == 0, float(0.0))
else:
shifted_query = query
attn_mask = None
# nW*B, window_size, window_size, C
query_windows = self.window_partition(shifted_query)
# nW*B, window_size*window_size, C
query_windows = query_windows.view(-1, self.window_size**2, C)
# W-MSA/SW-MSA (nW*B, window_size*window_size, C)
attn_windows = self.w_msa(query_windows, mask=attn_mask)
# merge windows
attn_windows = attn_windows.view(-1, self.window_size,
self.window_size, C)
# B H' W' C
shifted_x = self.window_reverse(attn_windows, H_pad, W_pad)
# reverse cyclic shift
if self.shift_size > 0:
x = torch.roll(
shifted_x,
shifts=(self.shift_size, self.shift_size),
dims=(1, 2))
else:
x = shifted_x
if pad_r > 0 or pad_b:
x = x[:, :H, :W, :].contiguous()
x = x.view(B, H * W, C)
x = self.drop(x)
return x
def window_reverse(self, windows, H, W):
"""
Args:
windows: (num_windows*B, window_size, window_size, C)
H (int): Height of image
W (int): Width of image
Returns:
x: (B, H, W, C)
"""
window_size = self.window_size
B = int(windows.shape[0] / (H * W / window_size / window_size))
x = windows.view(B, H // window_size, W // window_size, window_size,
window_size, -1)
x = x.permute(0, 1, 3, 2, 4, 5).contiguous().view(B, H, W, -1)
return x
def window_partition(self, x):
"""
Args:
x: (B, H, W, C)
Returns:
windows: (num_windows*B, window_size, window_size, C)
"""
B, H, W, C = x.shape
window_size = self.window_size
x = x.view(B, H // window_size, window_size, W // window_size,
window_size, C)
windows = x.permute(0, 1, 3, 2, 4, 5).contiguous()
windows = windows.view(-1, window_size, window_size, C)
return windows
class SwinBlock(nn.Module):
""""
Args:
embed_dims (int): The feature dimension.
num_heads (int): Parallel attention heads.
feedforward_channels (int): The hidden dimension for FFNs.
window_size (int, optional): The local window scale. Default: 7.
shift (bool, optional): whether to shift window or not. Default False.
qkv_bias (bool, optional): enable bias for qkv if True. Default: True.
qk_scale (float | None, optional): Override default qk scale of
head_dim ** -0.5 if set. Default: None.
drop_rate (float, optional): Dropout rate. Default: 0.
attn_drop_rate (float, optional): Attention dropout rate. Default: 0.
drop_path_rate (float, optional): Stochastic depth rate. Default: 0.
act_cfg (dict, optional): The config dict of activation function.
Default: dict(type='GELU').
norm_cfg (dict, optional): The config dict of normalization.
Default: dict(type='LN').
with_cp (bool, optional): Use checkpoint or not. Using checkpoint
will save some memory while slowing down the training speed.
Default: False.
"""
def __init__(self,
embed_dims,
num_heads,
feedforward_channels,
window_size=7,
shift=False,
qkv_bias=True,
qk_scale=None,
drop_rate=0.,
attn_drop_rate=0.,
drop_path_rate=0.,
act_cfg=dict(type='GELU'),
norm_cfg=dict(type='LN'),
with_cp=False,):
super(SwinBlock, self).__init__()
self._is_init = False
self.with_cp = with_cp
self.norm1 = nn.LayerNorm(embed_dims)
self.attn = ShiftWindowMSA(
embed_dims=embed_dims,
num_heads=num_heads,
window_size=window_size,
shift_size=window_size // 2 if shift else 0,
qkv_bias=qkv_bias,
qk_scale=qk_scale,
attn_drop_rate=attn_drop_rate,
proj_drop_rate=drop_rate,
dropout_layer=dict(type='DropPath', drop_prob=drop_path_rate),)
self.norm2 = nn.LayerNorm(embed_dims)
self.ffn = FFN(
embed_dims=embed_dims,
feedforward_channels=feedforward_channels,
num_fcs=2,
ffn_drop=drop_rate,
dropout_layer=dict(type='DropPath', drop_prob=drop_path_rate),
act_cfg=act_cfg,
add_identity=True,)
def forward(self, x, hw_shape):
def _inner_forward(x):
identity = x
x = self.norm1(x)
x = self.attn(x, hw_shape)
x = x + identity
identity = x
x = self.norm2(x)
x = self.ffn(x, identity=identity)
return x
if self.with_cp and x.requires_grad:
x = cp.checkpoint(_inner_forward, x)
else:
x = _inner_forward(x)
return x
class SwinBlockSequence(nn.Module):
"""Implements one stage in Swin Transformer.
Args:
embed_dims (int): The feature dimension.
num_heads (int): Parallel attention heads.
feedforward_channels (int): The hidden dimension for FFNs.
depth (int): The number of blocks in this stage.
window_size (int, optional): The local window scale. Default: 7.
qkv_bias (bool, optional): enable bias for qkv if True. Default: True.
qk_scale (float | None, optional): Override default qk scale of
head_dim ** -0.5 if set. Default: None.
drop_rate (float, optional): Dropout rate. Default: 0.
attn_drop_rate (float, optional): Attention dropout rate. Default: 0.
drop_path_rate (float | list[float], optional): Stochastic depth
rate. Default: 0.
downsample (BaseModule | None, optional): The downsample operation
module. Default: None.
act_cfg (dict, optional): The config dict of activation function.
Default: dict(type='GELU').
norm_cfg (dict, optional): The config dict of normalization.
Default: dict(type='LN').
with_cp (bool, optional): Use checkpoint or not. Using checkpoint
will save some memory while slowing down the training speed.
Default: False.
"""
def __init__(self,
embed_dims,
num_heads,
feedforward_channels,
depth,
window_size=7,
qkv_bias=True,
qk_scale=None,
drop_rate=0.,
attn_drop_rate=0.,
drop_path_rate=0.,
downsample=None,
act_cfg=dict(type='GELU'),
norm_cfg=dict(type='LN'),
with_cp=False):
super().__init__()
self._is_init = False
if isinstance(drop_path_rate, list):
drop_path_rates = drop_path_rate
assert len(drop_path_rates) == depth
else:
drop_path_rates = [deepcopy(drop_path_rate) for _ in range(depth)]
self.blocks = nn.ModuleList()
for i in range(depth):
block = SwinBlock(
embed_dims=embed_dims,
num_heads=num_heads,
feedforward_channels=feedforward_channels,
window_size=window_size,
shift=False if i % 2 == 0 else True,
qkv_bias=qkv_bias,
qk_scale=qk_scale,
drop_rate=drop_rate,
attn_drop_rate=attn_drop_rate,
drop_path_rate=drop_path_rates[i],
act_cfg=act_cfg,
norm_cfg=norm_cfg,
with_cp=with_cp,)
self.blocks.append(block)
self.downsample = downsample
def forward(self, x, hw_shape):
for block in self.blocks:
x = block(x, hw_shape)
if self.downsample:
x_down, down_hw_shape = self.downsample(x, hw_shape)
return x_down, down_hw_shape, x, hw_shape
else:
return x, hw_shape, x, hw_shape
class SwinTransformer(nn.Module):
""" Swin Transformer
A PyTorch implement of : `Swin Transformer:
Hierarchical Vision Transformer using Shifted Windows` -
https://arxiv.org/abs/2103.14030
This code is adapted from https://github.com/open-mmlab/mmdetection/blob/main/mmdet/models/backbones/swin.py
with minimal modifications.
Args:
pretrain_img_size (int | tuple[int]): The size of input image when
pretrain. Defaults: 224.
in_channels (int): The num of input channels.
Defaults: 3.
embed_dims (int): The feature dimension. Default: 96.
patch_size (int | tuple[int]): Patch size. Default: 4.
window_size (int): Window size. Default: 7.
mlp_ratio (int): Ratio of mlp hidden dim to embedding dim.
Default: 4.
depths (tuple[int]): Depths of each Swin Transformer stage.
Default: (2, 2, 6, 2).
num_heads (tuple[int]): Parallel attention heads of each Swin
Transformer stage. Default: (3, 6, 12, 24).
strides (tuple[int]): The patch merging or patch embedding stride of
each Swin Transformer stage. (In swin, we set kernel size equal to
stride.) Default: (4, 2, 2, 2).
out_indices (tuple[int]): Output from which stages.
Default: (0, 1, 2, 3).
qkv_bias (bool, optional): If True, add a learnable bias to query, key,
value. Default: True
qk_scale (float | None, optional): Override default qk scale of
head_dim ** -0.5 if set. Default: None.
patch_norm (bool): If add a norm layer for patch embed and patch
merging. Default: True.
drop_rate (float): Dropout rate. Defaults: 0.
attn_drop_rate (float): Attention dropout rate. Default: 0.
drop_path_rate (float): Stochastic depth rate. Defaults: 0.1.
use_abs_pos_embed (bool): If True, add absolute position embedding to
the patch embedding. Defaults: False.
act_cfg (dict): Config dict for activation layer.
Default: dict(type='GELU').
norm_cfg (dict): Config dict for normalization layer at
output of backone. Defaults: dict(type='LN').
with_cp (bool, optional): Use checkpoint or not. Using checkpoint
will save some memory while slowing down the training speed.
Default: False.
pretrained (str, optional): model pretrained path. Default: None.
convert_weights (bool): The flag indicates whether the
pre-trained model is from the original repo. We may need
to convert some keys to make it compatible.
Default: False.
frozen_stages (int): Stages to be frozen (stop grad and set eval mode).
Default: -1 (-1 means not freezing any parameters).
init_cfg (dict, optional): The Config for initialization.
Defaults to None.
"""
def __init__(self, model_cfg):
self.model_cfg = model_cfg
pretrain_img_size = self.model_cfg.get('PRETRAIN_IMG_SIZE', 224)
init_cfg = self.model_cfg.get('INIT_CFG', None)
depths = self.model_cfg.DEPTHS
in_channels = self.model_cfg.get('IN_CHANNELS', 3)
strides = self.model_cfg.get('STRIDES', (4, 2, 2, 2))
patch_size = self.model_cfg.get('PATCH_SIZE', 4)
embed_dims = self.model_cfg.EMBED_DIMS
num_heads = self.model_cfg.NUM_HEADS
window_size = self.model_cfg.WINDOW_SIZE
mlp_ratio = self.model_cfg.MLP_RATIO
qkv_bias = self.model_cfg.get('QKV_BIAS', True)
qk_scale = self.model_cfg.get('QK_SCALE', None)
drop_rate = self.model_cfg.DROP_RATE
attn_drop_rate = self.model_cfg.ATTN_DROP_RATE
drop_path_rate = self.model_cfg.DROP_PATH_RATE
patch_norm = self.model_cfg.get('PATCH_NORM', True)
out_indices = self.model_cfg.get('OUT_INDICES', [0, 1, 2, 3])
with_cp = self.model_cfg.get('WITH_CP', False)
use_abs_pos_embed = self.model_cfg.get('USE_ABS_POS_EMBED', False)
act_cfg=dict(type='GELU')
norm_cfg=dict(type='LN')
self.convert_weights = self.model_cfg.get('CONVERT_WEIGHTS', False)
self.frozen_stages = self.model_cfg.get('FROZEN_STAGES', -1)
if isinstance(pretrain_img_size, int):
pretrain_img_size = to_2tuple(pretrain_img_size)
elif isinstance(pretrain_img_size, tuple):
if len(pretrain_img_size) == 1:
pretrain_img_size = to_2tuple(pretrain_img_size[0])
assert len(pretrain_img_size) == 2, \
f'The size of image should have length 1 or 2, ' \
f'but got {len(pretrain_img_size)}'
super(SwinTransformer, self).__init__()
self.init_cfg = init_cfg
num_layers = len(depths)
self.out_indices = out_indices
self.use_abs_pos_embed = use_abs_pos_embed
assert strides[0] == patch_size, 'Use non-overlapping patch embed.'
self.patch_embed = PatchEmbed(
in_channels=in_channels,
embed_dims=embed_dims,
conv_type='Conv2d',
kernel_size=patch_size,
stride=strides[0],
norm_cfg=norm_cfg if patch_norm else None)
if self.use_abs_pos_embed:
patch_row = pretrain_img_size[0] // patch_size
patch_col = pretrain_img_size[1] // patch_size
num_patches = patch_row * patch_col
self.absolute_pos_embed = nn.Parameter(
torch.zeros((1, num_patches, embed_dims)))
self.drop_after_pos = nn.Dropout(p=drop_rate)
# set stochastic depth decay rule
total_depth = sum(depths)
dpr = [
x.item() for x in torch.linspace(0, drop_path_rate, total_depth)
]
self.stages = nn.ModuleList()
in_channels = embed_dims
for i in range(num_layers):
if i < num_layers - 1:
downsample = PatchMerging(
in_channels=in_channels,
out_channels=2 * in_channels,
stride=strides[i + 1],
norm_cfg=norm_cfg if patch_norm else None)
else:
downsample = None
stage = SwinBlockSequence(
embed_dims=in_channels,
num_heads=num_heads[i],
feedforward_channels=mlp_ratio * in_channels,
depth=depths[i],
window_size=window_size,
qkv_bias=qkv_bias,
qk_scale=qk_scale,
drop_rate=drop_rate,
attn_drop_rate=attn_drop_rate,
drop_path_rate=dpr[sum(depths[:i]):sum(depths[:i + 1])],
downsample=downsample,
act_cfg=act_cfg,
norm_cfg=norm_cfg,
with_cp=with_cp)
self.stages.append(stage)
if downsample:
in_channels = downsample.out_channels
self.num_features = [int(embed_dims * 2**i) for i in range(num_layers)]
# Add a norm layer for each output
for i in out_indices:
layer = nn.LayerNorm(self.num_features[i])
layer_name = f'norm{i}'
self.add_module(layer_name, layer)
def train(self, mode=True):
"""Convert the model into training mode while keep layers freezed."""
super(SwinTransformer, self).train(mode)
self._freeze_stages()
def _freeze_stages(self):
if self.frozen_stages >= 0:
self.patch_embed.eval()
for param in self.patch_embed.parameters():
param.requires_grad = False
if self.use_abs_pos_embed:
self.absolute_pos_embed.requires_grad = False
self.drop_after_pos.eval()
for i in range(1, self.frozen_stages + 1):
if (i - 1) in self.out_indices:
norm_layer = getattr(self, f'norm{i-1}')
norm_layer.eval()
for param in norm_layer.parameters():
param.requires_grad = False
m = self.stages[i - 1]
m.eval()
for param in m.parameters():
param.requires_grad = False
def init_weights(self):
if self.init_cfg is None:
print(f'No pre-trained weights for '
f'{self.__class__.__name__}, '
f'training start from scratch')
if self.use_abs_pos_embed:
trunc_normal_(self.absolute_pos_embed, std=0.02)
for m in self.modules():
if isinstance(m, nn.Linear):
trunc_normal_init(m, std=.02, bias=0.)
elif isinstance(m, nn.LayerNorm):
constant_init(m, 1.0)
else:
assert 'checkpoint' in self.init_cfg, f'Only support ' \
f'specify `Pretrained` in ' \
f'`init_cfg` in ' \
f'{self.__class__.__name__} '
ckpt = torch.load(self.init_cfg.checkpoint, map_location='cpu')
if 'state_dict' in ckpt:
_state_dict = ckpt['state_dict']
elif 'model' in ckpt:
_state_dict = ckpt['model']
else:
_state_dict = ckpt
if self.convert_weights:
# supported loading weight from original repo,
_state_dict = swin_converter(_state_dict)
state_dict = OrderedDict()
for k, v in _state_dict.items():
if k.startswith('backbone.'):
state_dict[k[9:]] = v
# strip prefix of state_dict
if list(state_dict.keys())[0].startswith('module.'):
state_dict = {k[7:]: v for k, v in state_dict.items()}
# reshape absolute position embedding
if state_dict.get('absolute_pos_embed') is not None:
absolute_pos_embed = state_dict['absolute_pos_embed']
N1, L, C1 = absolute_pos_embed.size()
N2, C2, H, W = self.absolute_pos_embed.size()
if N1 != N2 or C1 != C2 or L != H * W:
print('Error in loading absolute_pos_embed, pass')
else:
state_dict['absolute_pos_embed'] = absolute_pos_embed.view(
N2, H, W, C2).permute(0, 3, 1, 2).contiguous()
# interpolate position bias table if needed
relative_position_bias_table_keys = [
k for k in state_dict.keys()
if 'relative_position_bias_table' in k
]
for table_key in relative_position_bias_table_keys:
table_pretrained = state_dict[table_key]
table_current = self.state_dict()[table_key]
L1, nH1 = table_pretrained.size()
L2, nH2 = table_current.size()
if nH1 != nH2:
print(f'Error in loading {table_key}, pass')
elif L1 != L2:
S1 = int(L1**0.5)
S2 = int(L2**0.5)
table_pretrained_resized = F.interpolate(
table_pretrained.permute(1, 0).reshape(1, nH1, S1, S1),
size=(S2, S2),
mode='bicubic')
state_dict[table_key] = table_pretrained_resized.view(
nH2, L2).permute(1, 0).contiguous()
# load state_dict
self.load_state_dict(state_dict, False)
def forward(self, batch_dict):
x = batch_dict['camera_imgs']
B, N, C, H, W = x.size()
x = x.view(B * N, C, H, W)
x, hw_shape = self.patch_embed(x)
if self.use_abs_pos_embed:
x = x + self.absolute_pos_embed
x = self.drop_after_pos(x)
outs = []
for i, stage in enumerate(self.stages):
x, hw_shape, out, out_hw_shape = stage(x, hw_shape)
if i in self.out_indices:
norm_layer = getattr(self, f'norm{i}')
out = norm_layer(out)
out = out.view(-1, *out_hw_shape,
self.num_features[i]).permute(0, 3, 1,
2).contiguous()
outs.append(out)
batch_dict['image_features'] = outs
return batch_dict
......@@ -14,6 +14,7 @@ from .mppnet_e2e import MPPNetE2E
from .pillarnet import PillarNet
from .voxelnext import VoxelNeXt
from .transfusion import TransFusion
from .bevfusion import BevFusion
__all__ = {
'Detector3DTemplate': Detector3DTemplate,
......@@ -33,6 +34,7 @@ __all__ = {
'PillarNet': PillarNet,
'VoxelNeXt': VoxelNeXt,
'TransFusion': TransFusion,
'BevFusion': BevFusion,
}
......
from .detector3d_template import Detector3DTemplate
from .. import backbones_image, view_transforms
from ..backbones_image import img_neck
from ..backbones_2d import fuser
class BevFusion(Detector3DTemplate):
def __init__(self, model_cfg, num_class, dataset):
super().__init__(model_cfg=model_cfg, num_class=num_class, dataset=dataset)
self.module_topology = [
'vfe', 'backbone_3d', 'map_to_bev_module', 'pfe',
'image_backbone','neck','vtransform','fuser',
'backbone_2d', 'dense_head', 'point_head', 'roi_head'
]
self.module_list = self.build_networks()
def build_neck(self,model_info_dict):
if self.model_cfg.get('NECK', None) is None:
return None, model_info_dict
neck_module = img_neck.__all__[self.model_cfg.NECK.NAME](
model_cfg=self.model_cfg.NECK
)
model_info_dict['module_list'].append(neck_module)
return neck_module, model_info_dict
def build_vtransform(self,model_info_dict):
if self.model_cfg.get('VTRANSFORM', None) is None:
return None, model_info_dict
vtransform_module = view_transforms.__all__[self.model_cfg.VTRANSFORM.NAME](
model_cfg=self.model_cfg.VTRANSFORM
)
model_info_dict['module_list'].append(vtransform_module)
return vtransform_module, model_info_dict
def build_image_backbone(self, model_info_dict):
if self.model_cfg.get('IMAGE_BACKBONE', None) is None:
return None, model_info_dict
image_backbone_module = backbones_image.__all__[self.model_cfg.IMAGE_BACKBONE.NAME](
model_cfg=self.model_cfg.IMAGE_BACKBONE
)
image_backbone_module.init_weights()
model_info_dict['module_list'].append(image_backbone_module)
return image_backbone_module, model_info_dict
def build_fuser(self, model_info_dict):
if self.model_cfg.get('FUSER', None) is None:
return None, model_info_dict
fuser_module = fuser.__all__[self.model_cfg.FUSER.NAME](
model_cfg=self.model_cfg.FUSER
)
model_info_dict['module_list'].append(fuser_module)
model_info_dict['num_bev_features'] = self.model_cfg.FUSER.OUT_CHANNEL
return fuser_module, model_info_dict
def forward(self, batch_dict):
for i,cur_module in enumerate(self.module_list):
batch_dict = cur_module(batch_dict)
if self.training:
loss, tb_dict, disp_dict = self.get_training_loss(batch_dict)
ret_dict = {
'loss': loss
}
return ret_dict, tb_dict, disp_dict
else:
pred_dicts, recall_dicts = self.post_processing(batch_dict)
return pred_dicts, recall_dicts
def get_training_loss(self,batch_dict):
disp_dict = {}
loss_trans, tb_dict = batch_dict['loss'],batch_dict['tb_dict']
tb_dict = {
'loss_trans': loss_trans.item(),
**tb_dict
}
loss = loss_trans
return loss, tb_dict, disp_dict
def post_processing(self, batch_dict):
post_process_cfg = self.model_cfg.POST_PROCESSING
batch_size = batch_dict['batch_size']
final_pred_dict = batch_dict['final_box_dicts']
recall_dict = {}
for index in range(batch_size):
pred_boxes = final_pred_dict[index]['pred_boxes']
recall_dict = self.generate_recall_record(
box_preds=pred_boxes,
recall_dict=recall_dict, batch_index=index, data_dict=batch_dict,
thresh_list=post_process_cfg.RECALL_THRESH_LIST
)
return final_pred_dict, recall_dict
"""
Mostly copy-paste from
https://github.com/open-mmlab/mmdetection/blob/ecac3a77becc63f23d9f6980b2a36f86acd00a8a/mmdet/models/layers/transformer/utils.py
"""
import copy
import math
import warnings
import collections.abc
from collections import OrderedDict
from itertools import repeat
from typing import Sequence
import torch
from torch import Tensor
import torch.nn as nn
import torch.nn.functional as F
# From PyTorch internals
def _ntuple(n):
def parse(x):
if isinstance(x, collections.abc.Iterable):
return x
return tuple(repeat(x, n))
return parse
to_2tuple = _ntuple(2)
def constant_init(module: nn.Module, val: float, bias: float = 0) -> None:
if hasattr(module, 'weight') and module.weight is not None:
nn.init.constant_(module.weight, val)
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias)
def trunc_normal_init(module: nn.Module,
mean: float = 0,
std: float = 1,
a: float = -2,
b: float = 2,
bias: float = 0) -> None:
if hasattr(module, 'weight') and module.weight is not None:
trunc_normal_(module.weight, mean, std, a, b) # type: ignore
if hasattr(module, 'bias') and module.bias is not None:
nn.init.constant_(module.bias, bias) # type: ignore
def _no_grad_trunc_normal_(tensor: Tensor, mean: float, std: float, a: float,
b: float) -> Tensor:
# Method based on
# https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf
# Modified from
# https://github.com/pytorch/pytorch/blob/master/torch/nn/init.py
def norm_cdf(x):
# Computes standard normal cumulative distribution function
return (1. + math.erf(x / math.sqrt(2.))) / 2.
if (mean < a - 2 * std) or (mean > b + 2 * std):
warnings.warn(
'mean is more than 2 std from [a, b] in nn.init.trunc_normal_. '
'The distribution of values may be incorrect.',
stacklevel=2)
with torch.no_grad():
# Values are generated by using a truncated uniform distribution and
# then using the inverse CDF for the normal distribution.
# Get upper and lower cdf values
lower = norm_cdf((a - mean) / std)
upper = norm_cdf((b - mean) / std)
# Uniformly fill tensor with values from [lower, upper], then translate
# to [2lower-1, 2upper-1].
tensor.uniform_(2 * lower - 1, 2 * upper - 1)
# Use inverse cdf transform for normal distribution to get truncated
# standard normal
tensor.erfinv_()
# Transform to proper mean, std
tensor.mul_(std * math.sqrt(2.))
tensor.add_(mean)
# Clamp to ensure it's in the proper range
tensor.clamp_(min=a, max=b)
return tensor
def trunc_normal_(tensor: Tensor,
mean: float = 0.,
std: float = 1.,
a: float = -2.,
b: float = 2.) -> Tensor:
r"""Fills the input Tensor with values drawn from a truncated
normal distribution. The values are effectively drawn from the
normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)`
with values outside :math:`[a, b]` redrawn until they are within
the bounds. The method used for generating the random values works
best when :math:`a \leq \text{mean} \leq b`.
Modified from
https://github.com/pytorch/pytorch/blob/master/torch/nn/init.py
Args:
tensor (``torch.Tensor``): an n-dimensional `torch.Tensor`.
mean (float): the mean of the normal distribution.
std (float): the standard deviation of the normal distribution.
a (float): the minimum cutoff value.
b (float): the maximum cutoff value.
"""
return _no_grad_trunc_normal_(tensor, mean, std, a, b)
def drop_path(x: torch.Tensor,
drop_prob: float = 0.,
training: bool = False) -> torch.Tensor:
"""Drop paths (Stochastic Depth) per sample (when applied in main path of
residual blocks).
We follow the implementation
https://github.com/rwightman/pytorch-image-models/blob/a2727c1bf78ba0d7b5727f5f95e37fb7f8866b1f/timm/models/layers/drop.py # noqa: E501
"""
if drop_prob == 0. or not training:
return x
keep_prob = 1 - drop_prob
# handle tensors with different dimensions, not just 4D tensors.
shape = (x.shape[0], ) + (1, ) * (x.ndim - 1)
random_tensor = keep_prob + torch.rand(
shape, dtype=x.dtype, device=x.device)
output = x.div(keep_prob) * random_tensor.floor()
return output
class DropPath(nn.Module):
"""Drop paths (Stochastic Depth) per sample (when applied in main path of
residual blocks).
We follow the implementation
https://github.com/rwightman/pytorch-image-models/blob/a2727c1bf78ba0d7b5727f5f95e37fb7f8866b1f/timm/models/layers/drop.py # noqa: E501
Args:
drop_prob (float): Probability of the path to be zeroed. Default: 0.1
"""
def __init__(self, drop_prob: float = 0.1):
super().__init__()
self.drop_prob = drop_prob
def forward(self, x: torch.Tensor) -> torch.Tensor:
return drop_path(x, self.drop_prob, self.training)
class FFN(nn.Module):
"""Implements feed-forward networks (FFNs) with identity connection.
Args:
embed_dims (int): The feature dimension. Same as
`MultiheadAttention`. Defaults: 256.
feedforward_channels (int): The hidden dimension of FFNs.
Defaults: 1024.
num_fcs (int, optional): The number of fully-connected layers in
FFNs. Default: 2.
act_cfg (dict, optional): The activation config for FFNs.
Default: dict(type='ReLU')
ffn_drop (float, optional): Probability of an element to be
zeroed in FFN. Default 0.0.
add_identity (bool, optional): Whether to add the
identity connection. Default: `True`.
dropout_layer (obj:`ConfigDict`): The dropout_layer used
when adding the shortcut.
init_cfg (obj:`mmcv.ConfigDict`): The Config for initialization.
Default: None.
"""
def __init__(self,
embed_dims=256,
feedforward_channels=1024,
num_fcs=2,
act_cfg=dict(type='ReLU', inplace=True),
ffn_drop=0.,
dropout_layer=None,
add_identity=True,
init_cfg=None,
**kwargs):
super().__init__()
self._is_init = False
self.init_cfg = copy.deepcopy(init_cfg)
assert num_fcs >= 2, 'num_fcs should be no less ' \
f'than 2. got {num_fcs}.'
self.embed_dims = embed_dims
self.feedforward_channels = feedforward_channels
self.num_fcs = num_fcs
self.act_cfg = act_cfg
# ignore act_cfg, default GELU
self.activate = nn.GELU()
layers = []
in_channels = embed_dims
for _ in range(num_fcs - 1):
layers.append(
nn.Sequential(
nn.Linear(in_channels, feedforward_channels), self.activate,
nn.Dropout(ffn_drop)))
in_channels = feedforward_channels
layers.append(nn.Linear(feedforward_channels, embed_dims))
layers.append(nn.Dropout(ffn_drop))
self.layers = nn.Sequential(*layers)
self.dropout_layer = DropPath(dropout_layer['drop_prob'])
self.add_identity = add_identity
def forward(self, x, identity=None):
"""Forward function for `FFN`.
The function would add x to the output tensor if residue is None.
"""
out = self.layers(x)
if not self.add_identity:
return self.dropout_layer(out)
if identity is None:
identity = x
return identity + self.dropout_layer(out)
def nlc_to_nchw(x, hw_shape):
"""Convert [N, L, C] shape tensor to [N, C, H, W] shape tensor.
Args:
x (Tensor): The input tensor of shape [N, L, C] before conversion.
hw_shape (Sequence[int]): The height and width of output feature map.
Returns:
Tensor: The output tensor of shape [N, C, H, W] after conversion.
"""
H, W = hw_shape
assert len(x.shape) == 3
B, L, C = x.shape
assert L == H * W, 'The seq_len does not match H, W'
return x.transpose(1, 2).reshape(B, C, H, W).contiguous()
def nchw_to_nlc(x):
"""Flatten [N, C, H, W] shape tensor to [N, L, C] shape tensor.
Args:
x (Tensor): The input tensor of shape [N, C, H, W] before conversion.
Returns:
Tensor: The output tensor of shape [N, L, C] after conversion.
"""
assert len(x.shape) == 4
return x.flatten(2).transpose(1, 2).contiguous()
class AdaptivePadding(nn.Module):
"""Applies padding to input (if needed) so that input can get fully covered
by filter you specified. It support two modes "same" and "corner". The
"same" mode is same with "SAME" padding mode in TensorFlow, pad zero around
input. The "corner" mode would pad zero to bottom right.
Args:
kernel_size (int | tuple): Size of the kernel:
stride (int | tuple): Stride of the filter. Default: 1:
dilation (int | tuple): Spacing between kernel elements.
Default: 1
padding (str): Support "same" and "corner", "corner" mode
would pad zero to bottom right, and "same" mode would
pad zero around input. Default: "corner".
Example:
>>> kernel_size = 16
>>> stride = 16
>>> dilation = 1
>>> input = torch.rand(1, 1, 15, 17)
>>> adap_pad = AdaptivePadding(
>>> kernel_size=kernel_size,
>>> stride=stride,
>>> dilation=dilation,
>>> padding="corner")
>>> out = adap_pad(input)
>>> assert (out.shape[2], out.shape[3]) == (16, 32)
>>> input = torch.rand(1, 1, 16, 17)
>>> out = adap_pad(input)
>>> assert (out.shape[2], out.shape[3]) == (16, 32)
"""
def __init__(self, kernel_size=1, stride=1, dilation=1, padding='corner'):
super(AdaptivePadding, self).__init__()
assert padding in ('same', 'corner')
kernel_size = to_2tuple(kernel_size)
stride = to_2tuple(stride)
padding = to_2tuple(padding)
dilation = to_2tuple(dilation)
self.padding = padding
self.kernel_size = kernel_size
self.stride = stride
self.dilation = dilation
def get_pad_shape(self, input_shape):
input_h, input_w = input_shape
kernel_h, kernel_w = self.kernel_size
stride_h, stride_w = self.stride
output_h = math.ceil(input_h / stride_h)
output_w = math.ceil(input_w / stride_w)
pad_h = max((output_h - 1) * stride_h +
(kernel_h - 1) * self.dilation[0] + 1 - input_h, 0)
pad_w = max((output_w - 1) * stride_w +
(kernel_w - 1) * self.dilation[1] + 1 - input_w, 0)
return pad_h, pad_w
def forward(self, x):
pad_h, pad_w = self.get_pad_shape(x.size()[-2:])
if pad_h > 0 or pad_w > 0:
if self.padding == 'corner':
x = F.pad(x, [0, pad_w, 0, pad_h])
elif self.padding == 'same':
x = F.pad(x, [
pad_w // 2, pad_w - pad_w // 2, pad_h // 2,
pad_h - pad_h // 2
])
return x
class PatchEmbed(nn.Module):
"""Image to Patch Embedding.
We use a conv layer to implement PatchEmbed.
Args:
in_channels (int): The num of input channels. Default: 3
embed_dims (int): The dimensions of embedding. Default: 768
conv_type (str): The config dict for embedding
conv layer type selection. Default: "Conv2d.
kernel_size (int): The kernel_size of embedding conv. Default: 16.
stride (int): The slide stride of embedding conv.
Default: None (Would be set as `kernel_size`).
padding (int | tuple | string ): The padding length of
embedding conv. When it is a string, it means the mode
of adaptive padding, support "same" and "corner" now.
Default: "corner".
dilation (int): The dilation rate of embedding conv. Default: 1.
bias (bool): Bias of embed conv. Default: True.
norm_cfg (dict, optional): Config dict for normalization layer.
Default: None.
input_size (int | tuple | None): The size of input, which will be
used to calculate the out size. Only work when `dynamic_size`
is False. Default: None.
init_cfg (`mmcv.ConfigDict`, optional): The Config for initialization.
Default: None.
"""
def __init__(
self,
in_channels=3,
embed_dims=768,
conv_type='Conv2d',
kernel_size=16,
stride=16,
padding='corner',
dilation=1,
bias=True,
norm_cfg=None,
input_size=None,
init_cfg=None,
):
super(PatchEmbed, self).__init__()
self._is_init = False
self.init_cfg = copy.deepcopy(init_cfg)
self.embed_dims = embed_dims
if stride is None:
stride = kernel_size
kernel_size = to_2tuple(kernel_size)
stride = to_2tuple(stride)
dilation = to_2tuple(dilation)
if isinstance(padding, str):
self.adap_padding = AdaptivePadding(
kernel_size=kernel_size,
stride=stride,
dilation=dilation,
padding=padding)
# disable the padding of conv
padding = 0
else:
self.adap_padding = None
padding = to_2tuple(padding)
self.projection = nn.Conv2d(
in_channels=in_channels,
out_channels=embed_dims,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias)
if norm_cfg is not None:
self.norm = nn.LayerNorm(embed_dims)
else:
self.norm = None
if input_size:
input_size = to_2tuple(input_size)
# `init_out_size` would be used outside to
# calculate the num_patches
# when `use_abs_pos_embed` outside
self.init_input_size = input_size
if self.adap_padding:
pad_h, pad_w = self.adap_padding.get_pad_shape(input_size)
input_h, input_w = input_size
input_h = input_h + pad_h
input_w = input_w + pad_w
input_size = (input_h, input_w)
# https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html
h_out = (input_size[0] + 2 * padding[0] - dilation[0] *
(kernel_size[0] - 1) - 1) // stride[0] + 1
w_out = (input_size[1] + 2 * padding[1] - dilation[1] *
(kernel_size[1] - 1) - 1) // stride[1] + 1
self.init_out_size = (h_out, w_out)
else:
self.init_input_size = None
self.init_out_size = None
def forward(self, x):
"""
Args:
x (Tensor): Has shape (B, C, H, W). In most case, C is 3.
Returns:
tuple: Contains merged results and its spatial shape.
- x (Tensor): Has shape (B, out_h * out_w, embed_dims)
- out_size (tuple[int]): Spatial shape of x, arrange as
(out_h, out_w).
"""
if self.adap_padding:
x = self.adap_padding(x)
x = self.projection(x)
out_size = (x.shape[2], x.shape[3])
x = x.flatten(2).transpose(1, 2)
if self.norm is not None:
x = self.norm(x)
return x, out_size
class PatchMerging(nn.Module):
"""Merge patch feature map.
This layer groups feature map by kernel_size, and applies norm and linear
layers to the grouped feature map. Our implementation uses `nn.Unfold` to
merge patch, which is about 25% faster than original implementation.
Instead, we need to modify pretrained models for compatibility.
Args:
in_channels (int): The num of input channels.
to gets fully covered by filter and stride you specified..
Default: True.
out_channels (int): The num of output channels.
kernel_size (int | tuple, optional): the kernel size in the unfold
layer. Defaults to 2.
stride (int | tuple, optional): the stride of the sliding blocks in the
unfold layer. Default: None. (Would be set as `kernel_size`)
padding (int | tuple | string ): The padding length of
embedding conv. When it is a string, it means the mode
of adaptive padding, support "same" and "corner" now.
Default: "corner".
dilation (int | tuple, optional): dilation parameter in the unfold
layer. Default: 1.
bias (bool, optional): Whether to add bias in linear layer or not.
Defaults: False.
norm_cfg (dict, optional): Config dict for normalization layer.
Default: dict(type='LN').
init_cfg (dict, optional): The extra config for initialization.
Default: None.
"""
def __init__(self,
in_channels,
out_channels,
kernel_size=2,
stride=None,
padding='corner',
dilation=1,
bias=False,
norm_cfg=dict(type='LN'),
init_cfg=None):
super().__init__()
self._is_init = False
self.init_cfg = copy.deepcopy(init_cfg)
self.in_channels = in_channels
self.out_channels = out_channels
if stride:
stride = stride
else:
stride = kernel_size
kernel_size = to_2tuple(kernel_size)
stride = to_2tuple(stride)
dilation = to_2tuple(dilation)
if isinstance(padding, str):
self.adap_padding = AdaptivePadding(
kernel_size=kernel_size,
stride=stride,
dilation=dilation,
padding=padding)
# disable the padding of unfold
padding = 0
else:
self.adap_padding = None
padding = to_2tuple(padding)
self.sampler = nn.Unfold(
kernel_size=kernel_size,
dilation=dilation,
padding=padding,
stride=stride)
sample_dim = kernel_size[0] * kernel_size[1] * in_channels
if norm_cfg is not None:
self.norm = nn.LayerNorm(sample_dim)
else:
self.norm = None
self.reduction = nn.Linear(sample_dim, out_channels, bias=bias)
def forward(self, x, input_size):
"""
Args:
x (Tensor): Has shape (B, H*W, C_in).
input_size (tuple[int]): The spatial shape of x, arrange as (H, W).
Default: None.
Returns:
tuple: Contains merged results and its spatial shape.
- x (Tensor): Has shape (B, Merged_H * Merged_W, C_out)
- out_size (tuple[int]): Spatial shape of x, arrange as
(Merged_H, Merged_W).
"""
B, L, C = x.shape
assert isinstance(input_size, Sequence), f'Expect ' \
f'input_size is ' \
f'`Sequence` ' \
f'but get {input_size}'
H, W = input_size
assert L == H * W, 'input feature has wrong size'
x = x.view(B, H, W, C).permute([0, 3, 1, 2]) # B, C, H, W
# Use nn.Unfold to merge patch. About 25% faster than original method,
# but need to modify pretrained model for compatibility
if self.adap_padding:
x = self.adap_padding(x)
H, W = x.shape[-2:]
x = self.sampler(x)
# if kernel_size=2 and stride=2, x should has shape (B, 4*C, H/2*W/2)
out_h = (H + 2 * self.sampler.padding[0] - self.sampler.dilation[0] *
(self.sampler.kernel_size[0] - 1) -
1) // self.sampler.stride[0] + 1
out_w = (W + 2 * self.sampler.padding[1] - self.sampler.dilation[1] *
(self.sampler.kernel_size[1] - 1) -
1) // self.sampler.stride[1] + 1
output_size = (out_h, out_w)
x = x.transpose(1, 2) # B, H/2*W/2, 4*C
x = self.norm(x) if self.norm else x
x = self.reduction(x)
return x, output_size
def inverse_sigmoid(x, eps=1e-5):
"""Inverse function of sigmoid.
Args:
x (Tensor): The tensor to do the
inverse.
eps (float): EPS avoid numerical
overflow. Defaults 1e-5.
Returns:
Tensor: The x has passed the inverse
function of sigmoid, has same
shape with input.
"""
x = x.clamp(min=0, max=1)
x1 = x.clamp(min=eps)
x2 = (1 - x).clamp(min=eps)
return torch.log(x1 / x2)
def swin_converter(ckpt):
new_ckpt = OrderedDict()
def correct_unfold_reduction_order(x):
out_channel, in_channel = x.shape
x = x.reshape(out_channel, 4, in_channel // 4)
x = x[:, [0, 2, 1, 3], :].transpose(1,
2).reshape(out_channel, in_channel)
return x
def correct_unfold_norm_order(x):
in_channel = x.shape[0]
x = x.reshape(4, in_channel // 4)
x = x[[0, 2, 1, 3], :].transpose(0, 1).reshape(in_channel)
return x
for k, v in ckpt.items():
if k.startswith('head'):
continue
elif k.startswith('layers'):
new_v = v
if 'attn.' in k:
new_k = k.replace('attn.', 'attn.w_msa.')
elif 'mlp.' in k:
if 'mlp.fc1.' in k:
new_k = k.replace('mlp.fc1.', 'ffn.layers.0.0.')
elif 'mlp.fc2.' in k:
new_k = k.replace('mlp.fc2.', 'ffn.layers.1.')
else:
new_k = k.replace('mlp.', 'ffn.')
elif 'downsample' in k:
new_k = k
if 'reduction.' in k:
new_v = correct_unfold_reduction_order(v)
elif 'norm.' in k:
new_v = correct_unfold_norm_order(v)
else:
new_k = k
new_k = new_k.replace('layers', 'stages', 1)
elif k.startswith('patch_embed'):
new_v = v
if 'proj' in k:
new_k = k.replace('proj', 'projection')
else:
new_k = k
else:
new_v = v
new_k = k
new_ckpt['backbone.' + new_k] = new_v
return new_ckpt
from .depth_lss import DepthLSSTransform
__all__ = {
'DepthLSSTransform': DepthLSSTransform,
}
\ No newline at end of file
import torch
from torch import nn
from pcdet.ops.bev_pool import bev_pool
def gen_dx_bx(xbound, ybound, zbound):
dx = torch.Tensor([row[2] for row in [xbound, ybound, zbound]])
bx = torch.Tensor([row[0] + row[2] / 2.0 for row in [xbound, ybound, zbound]])
nx = torch.LongTensor(
[(row[1] - row[0]) / row[2] for row in [xbound, ybound, zbound]]
)
return dx, bx, nx
class DepthLSSTransform(nn.Module):
"""
This module implements LSS, which lists images into 3D and then splats onto bev features.
This code is adapted from https://github.com/mit-han-lab/bevfusion/ with minimal modifications.
"""
def __init__(self, model_cfg):
super().__init__()
self.model_cfg = model_cfg
in_channel = self.model_cfg.IN_CHANNEL
out_channel = self.model_cfg.OUT_CHANNEL
self.image_size = self.model_cfg.IMAGE_SIZE
self.feature_size = self.model_cfg.FEATURE_SIZE
xbound = self.model_cfg.XBOUND
ybound = self.model_cfg.YBOUND
zbound = self.model_cfg.ZBOUND
self.dbound = self.model_cfg.DBOUND
downsample = self.model_cfg.DOWNSAMPLE
dx, bx, nx = gen_dx_bx(xbound, ybound, zbound)
self.dx = nn.Parameter(dx, requires_grad=False)
self.bx = nn.Parameter(bx, requires_grad=False)
self.nx = nn.Parameter(nx, requires_grad=False)
self.C = out_channel
self.frustum = self.create_frustum()
self.D = self.frustum.shape[0]
self.dtransform = nn.Sequential(
nn.Conv2d(1, 8, 1),
nn.BatchNorm2d(8),
nn.ReLU(True),
nn.Conv2d(8, 32, 5, stride=4, padding=2),
nn.BatchNorm2d(32),
nn.ReLU(True),
nn.Conv2d(32, 64, 5, stride=2, padding=2),
nn.BatchNorm2d(64),
nn.ReLU(True),
)
self.depthnet = nn.Sequential(
nn.Conv2d(in_channel + 64, in_channel, 3, padding=1),
nn.BatchNorm2d(in_channel),
nn.ReLU(True),
nn.Conv2d(in_channel, in_channel, 3, padding=1),
nn.BatchNorm2d(in_channel),
nn.ReLU(True),
nn.Conv2d(in_channel, self.D + self.C, 1),
)
if downsample > 1:
assert downsample == 2, downsample
self.downsample = nn.Sequential(
nn.Conv2d(out_channel, out_channel, 3, padding=1, bias=False),
nn.BatchNorm2d(out_channel),
nn.ReLU(True),
nn.Conv2d(out_channel, out_channel, 3, stride=downsample, padding=1, bias=False),
nn.BatchNorm2d(out_channel),
nn.ReLU(True),
nn.Conv2d(out_channel, out_channel, 3, padding=1, bias=False),
nn.BatchNorm2d(out_channel),
nn.ReLU(True),
)
else:
self.downsample = nn.Identity()
def create_frustum(self):
iH, iW = self.image_size
fH, fW = self.feature_size
ds = torch.arange(*self.dbound, dtype=torch.float).view(-1, 1, 1).expand(-1, fH, fW)
D, _, _ = ds.shape
xs = torch.linspace(0, iW - 1, fW, dtype=torch.float).view(1, 1, fW).expand(D, fH, fW)
ys = torch.linspace(0, iH - 1, fH, dtype=torch.float).view(1, fH, 1).expand(D, fH, fW)
frustum = torch.stack((xs, ys, ds), -1)
return nn.Parameter(frustum, requires_grad=False)
def get_geometry(self, camera2lidar_rots, camera2lidar_trans, intrins, post_rots, post_trans, **kwargs):
camera2lidar_rots = camera2lidar_rots.to(torch.float)
camera2lidar_trans = camera2lidar_trans.to(torch.float)
intrins = intrins.to(torch.float)
post_rots = post_rots.to(torch.float)
post_trans = post_trans.to(torch.float)
B, N, _ = camera2lidar_trans.shape
# undo post-transformation
# B x N x D x H x W x 3
points = self.frustum - post_trans.view(B, N, 1, 1, 1, 3)
points = torch.inverse(post_rots).view(B, N, 1, 1, 1, 3, 3).matmul(points.unsqueeze(-1))
# cam_to_lidar
points = torch.cat((points[:, :, :, :, :, :2] * points[:, :, :, :, :, 2:3], points[:, :, :, :, :, 2:3]), 5)
combine = camera2lidar_rots.matmul(torch.inverse(intrins))
points = combine.view(B, N, 1, 1, 1, 3, 3).matmul(points).squeeze(-1)
points += camera2lidar_trans.view(B, N, 1, 1, 1, 3)
if "extra_rots" in kwargs:
extra_rots = kwargs["extra_rots"]
points = extra_rots.view(B, 1, 1, 1, 1, 3, 3).repeat(1, N, 1, 1, 1, 1, 1) \
.matmul(points.unsqueeze(-1)).squeeze(-1)
if "extra_trans" in kwargs:
extra_trans = kwargs["extra_trans"]
points += extra_trans.view(B, 1, 1, 1, 1, 3).repeat(1, N, 1, 1, 1, 1)
return points
def bev_pool(self, geom_feats, x):
geom_feats = geom_feats.to(torch.float)
x = x.to(torch.float)
B, N, D, H, W, C = x.shape
Nprime = B * N * D * H * W
# flatten x
x = x.reshape(Nprime, C)
# flatten indices
geom_feats = ((geom_feats - (self.bx - self.dx / 2.0)) / self.dx).long()
geom_feats = geom_feats.view(Nprime, 3)
batch_ix = torch.cat([torch.full([Nprime // B, 1], ix, device=x.device, dtype=torch.long) for ix in range(B)])
geom_feats = torch.cat((geom_feats, batch_ix), 1)
# filter out points that are outside box
kept = (
(geom_feats[:, 0] >= 0)
& (geom_feats[:, 0] < self.nx[0])
& (geom_feats[:, 1] >= 0)
& (geom_feats[:, 1] < self.nx[1])
& (geom_feats[:, 2] >= 0)
& (geom_feats[:, 2] < self.nx[2])
)
x = x[kept]
geom_feats = geom_feats[kept]
x = bev_pool(x, geom_feats, B, self.nx[2], self.nx[0], self.nx[1])
# collapse Z
final = torch.cat(x.unbind(dim=2), 1)
return final
def get_cam_feats(self, x, d):
B, N, C, fH, fW = x.shape
d = d.view(B * N, *d.shape[2:])
x = x.view(B * N, C, fH, fW)
d = self.dtransform(d)
x = torch.cat([d, x], dim=1)
x = self.depthnet(x)
depth = x[:, : self.D].softmax(dim=1)
x = depth.unsqueeze(1) * x[:, self.D : (self.D + self.C)].unsqueeze(2)
x = x.view(B, N, self.C, self.D, fH, fW)
x = x.permute(0, 1, 3, 4, 5, 2)
return x
def forward(self, batch_dict):
"""
Args:
batch_dict:
image_fpn (list[tensor]): image features after image neck
Returns:
batch_dict:
spatial_features_img (tensor): bev features from image modality
"""
x = batch_dict['image_fpn']
x = x[0]
BN, C, H, W = x.size()
img = x.view(int(BN/6), 6, C, H, W)
camera_intrinsics = batch_dict['camera_intrinsics']
camera2lidar = batch_dict['camera2lidar']
img_aug_matrix = batch_dict['img_aug_matrix']
lidar_aug_matrix = batch_dict['lidar_aug_matrix']
lidar2image = batch_dict['lidar2image']
intrins = camera_intrinsics[..., :3, :3]
post_rots = img_aug_matrix[..., :3, :3]
post_trans = img_aug_matrix[..., :3, 3]
camera2lidar_rots = camera2lidar[..., :3, :3]
camera2lidar_trans = camera2lidar[..., :3, 3]
points = batch_dict['points']
batch_size = BN // 6
depth = torch.zeros(batch_size, img.shape[1], 1, *self.image_size).to(points[0].device)
for b in range(batch_size):
batch_mask = points[:,0] == b
cur_coords = points[batch_mask][:, 1:4]
cur_img_aug_matrix = img_aug_matrix[b]
cur_lidar_aug_matrix = lidar_aug_matrix[b]
cur_lidar2image = lidar2image[b]
# inverse aug
cur_coords -= cur_lidar_aug_matrix[:3, 3]
cur_coords = torch.inverse(cur_lidar_aug_matrix[:3, :3]).matmul(
cur_coords.transpose(1, 0)
)
# lidar2image
cur_coords = cur_lidar2image[:, :3, :3].matmul(cur_coords)
cur_coords += cur_lidar2image[:, :3, 3].reshape(-1, 3, 1)
# get 2d coords
dist = cur_coords[:, 2, :]
cur_coords[:, 2, :] = torch.clamp(cur_coords[:, 2, :], 1e-5, 1e5)
cur_coords[:, :2, :] /= cur_coords[:, 2:3, :]
# do image aug
cur_coords = cur_img_aug_matrix[:, :3, :3].matmul(cur_coords)
cur_coords += cur_img_aug_matrix[:, :3, 3].reshape(-1, 3, 1)
cur_coords = cur_coords[:, :2, :].transpose(1, 2)
# normalize coords for grid sample
cur_coords = cur_coords[..., [1, 0]]
# filter points outside of images
on_img = (
(cur_coords[..., 0] < self.image_size[0])
& (cur_coords[..., 0] >= 0)
& (cur_coords[..., 1] < self.image_size[1])
& (cur_coords[..., 1] >= 0)
)
for c in range(on_img.shape[0]):
masked_coords = cur_coords[c, on_img[c]].long()
masked_dist = dist[c, on_img[c]]
depth[b, c, 0, masked_coords[:, 0], masked_coords[:, 1]] = masked_dist
extra_rots = lidar_aug_matrix[..., :3, :3]
extra_trans = lidar_aug_matrix[..., :3, 3]
geom = self.get_geometry(
camera2lidar_rots, camera2lidar_trans, intrins, post_rots,
post_trans, extra_rots=extra_rots, extra_trans=extra_trans,
)
# use points depth to assist the depth prediction in images
x = self.get_cam_feats(img, depth)
x = self.bev_pool(geom, x)
x = self.downsample(x)
# convert bev features from (b, c, x, y) to (b, c, y, x)
x = x.permute(0, 1, 3, 2)
batch_dict['spatial_features_img'] = x
return batch_dict
\ No newline at end of file
from .bev_pool import bev_pool
\ No newline at end of file
import torch
from . import bev_pool_ext
__all__ = ["bev_pool"]
class QuickCumsum(torch.autograd.Function):
@staticmethod
def forward(ctx, x, geom_feats, ranks):
x = x.cumsum(0)
kept = torch.ones(x.shape[0], device=x.device, dtype=torch.bool)
kept[:-1] = ranks[1:] != ranks[:-1]
x, geom_feats = x[kept], geom_feats[kept]
x = torch.cat((x[:1], x[1:] - x[:-1]))
# save kept for backward
ctx.save_for_backward(kept)
# no gradient for geom_feats
ctx.mark_non_differentiable(geom_feats)
return x, geom_feats
@staticmethod
def backward(ctx, gradx, gradgeom):
(kept,) = ctx.saved_tensors
back = torch.cumsum(kept, 0)
back[kept] -= 1
val = gradx[back]
return val, None, None
class QuickCumsumCuda(torch.autograd.Function):
@staticmethod
def forward(ctx, x, geom_feats, ranks, B, D, H, W):
kept = torch.ones(x.shape[0], device=x.device, dtype=torch.bool)
kept[1:] = ranks[1:] != ranks[:-1]
interval_starts = torch.where(kept)[0].int()
interval_lengths = torch.zeros_like(interval_starts)
interval_lengths[:-1] = interval_starts[1:] - interval_starts[:-1]
interval_lengths[-1] = x.shape[0] - interval_starts[-1]
geom_feats = geom_feats.int()
out = bev_pool_ext.bev_pool_forward(
x,
geom_feats,
interval_lengths,
interval_starts,
B,
D,
H,
W,
)
ctx.save_for_backward(interval_starts, interval_lengths, geom_feats)
ctx.saved_shapes = B, D, H, W
return out
@staticmethod
def backward(ctx, out_grad):
interval_starts, interval_lengths, geom_feats = ctx.saved_tensors
B, D, H, W = ctx.saved_shapes
out_grad = out_grad.contiguous()
x_grad = bev_pool_ext.bev_pool_backward(
out_grad,
geom_feats,
interval_lengths,
interval_starts,
B,
D,
H,
W,
)
return x_grad, None, None, None, None, None, None
def bev_pool(feats, coords, B, D, H, W):
assert feats.shape[0] == coords.shape[0]
ranks = (
coords[:, 0] * (W * D * B)
+ coords[:, 1] * (D * B)
+ coords[:, 2] * B
+ coords[:, 3]
)
indices = ranks.argsort()
feats, coords, ranks = feats[indices], coords[indices], ranks[indices]
x = QuickCumsumCuda.apply(feats, coords, ranks, B, D, H, W)
x = x.permute(0, 4, 1, 2, 3).contiguous()
return x
#include <torch/torch.h>
#include <c10/cuda/CUDAGuard.h>
// CUDA function declarations
void bev_pool(int b, int d, int h, int w, int n, int c, int n_intervals, const float* x,
const int* geom_feats, const int* interval_starts, const int* interval_lengths, float* out);
void bev_pool_grad(int b, int d, int h, int w, int n, int c, int n_intervals, const float* out_grad,
const int* geom_feats, const int* interval_starts, const int* interval_lengths, float* x_grad);
/*
Function: pillar pooling (forward, cuda)
Args:
x : input features, FloatTensor[n, c]
geom_feats : input coordinates, IntTensor[n, 4]
interval_lengths : starting position for pooled point, IntTensor[n_intervals]
interval_starts : how many points in each pooled point, IntTensor[n_intervals]
Return:
out : output features, FloatTensor[b, d, h, w, c]
*/
at::Tensor bev_pool_forward(
const at::Tensor _x,
const at::Tensor _geom_feats,
const at::Tensor _interval_lengths,
const at::Tensor _interval_starts,
int b, int d, int h, int w
) {
int n = _x.size(0);
int c = _x.size(1);
int n_intervals = _interval_lengths.size(0);
const at::cuda::OptionalCUDAGuard device_guard(device_of(_x));
const float* x = _x.data_ptr<float>();
const int* geom_feats = _geom_feats.data_ptr<int>();
const int* interval_lengths = _interval_lengths.data_ptr<int>();
const int* interval_starts = _interval_starts.data_ptr<int>();
auto options =
torch::TensorOptions().dtype(_x.dtype()).device(_x.device());
at::Tensor _out = torch::zeros({b, d, h, w, c}, options);
float* out = _out.data_ptr<float>();
bev_pool(
b, d, h, w, n, c, n_intervals, x,
geom_feats, interval_starts, interval_lengths, out
);
return _out;
}
/*
Function: pillar pooling (backward, cuda)
Args:
out_grad : input features, FloatTensor[b, d, h, w, c]
geom_feats : input coordinates, IntTensor[n, 4]
interval_lengths : starting position for pooled point, IntTensor[n_intervals]
interval_starts : how many points in each pooled point, IntTensor[n_intervals]
Return:
x_grad : output features, FloatTensor[n, 4]
*/
at::Tensor bev_pool_backward(
const at::Tensor _out_grad,
const at::Tensor _geom_feats,
const at::Tensor _interval_lengths,
const at::Tensor _interval_starts,
int b, int d, int h, int w
) {
int n = _geom_feats.size(0);
int c = _out_grad.size(4);
int n_intervals = _interval_lengths.size(0);
const at::cuda::OptionalCUDAGuard device_guard(device_of(_out_grad));
const float* out_grad = _out_grad.data_ptr<float>();
const int* geom_feats = _geom_feats.data_ptr<int>();
const int* interval_lengths = _interval_lengths.data_ptr<int>();
const int* interval_starts = _interval_starts.data_ptr<int>();
auto options =
torch::TensorOptions().dtype(_out_grad.dtype()).device(_out_grad.device());
at::Tensor _x_grad = torch::zeros({n, c}, options);
float* x_grad = _x_grad.data_ptr<float>();
bev_pool_grad(
b, d, h, w, n, c, n_intervals, out_grad,
geom_feats, interval_starts, interval_lengths, x_grad
);
return _x_grad;
}
PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
m.def("bev_pool_forward", &bev_pool_forward,
"bev_pool_forward");
m.def("bev_pool_backward", &bev_pool_backward,
"bev_pool_backward");
}
#include <stdio.h>
#include <stdlib.h>
/*
Function: pillar pooling
Args:
b : batch size
d : depth of the feature map
h : height of pooled feature map
w : width of pooled feature map
n : number of input points
c : number of channels
n_intervals : number of unique points
x : input features, FloatTensor[n, c]
geom_feats : input coordinates, IntTensor[n, 4]
interval_lengths : starting position for pooled point, IntTensor[n_intervals]
interval_starts : how many points in each pooled point, IntTensor[n_intervals]
out : output features, FloatTensor[b, d, h, w, c]
*/
__global__ void bev_pool_kernel(int b, int d, int h, int w, int n, int c, int n_intervals,
const float *__restrict__ x,
const int *__restrict__ geom_feats,
const int *__restrict__ interval_starts,
const int *__restrict__ interval_lengths,
float* __restrict__ out) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
int index = idx / c;
int cur_c = idx % c;
if (index >= n_intervals) return;
int interval_start = interval_starts[index];
int interval_length = interval_lengths[index];
const int* cur_geom_feats = geom_feats + interval_start * 4;
const float* cur_x = x + interval_start * c + cur_c;
float* cur_out = out + cur_geom_feats[3] * d * h * w * c +
cur_geom_feats[2] * h * w * c + cur_geom_feats[0] * w * c +
cur_geom_feats[1] * c + cur_c;
float psum = 0;
for(int i = 0; i < interval_length; i++){
psum += cur_x[i * c];
}
*cur_out = psum;
}
/*
Function: pillar pooling backward
Args:
b : batch size
d : depth of the feature map
h : height of pooled feature map
w : width of pooled feature map
n : number of input points
c : number of channels
n_intervals : number of unique points
out_grad : gradient of the BEV fmap from top, FloatTensor[b, d, h, w, c]
geom_feats : input coordinates, IntTensor[n, 4]
interval_lengths : starting position for pooled point, IntTensor[n_intervals]
interval_starts : how many points in each pooled point, IntTensor[n_intervals]
x_grad : gradient of the image fmap, FloatTensor
*/
__global__ void bev_pool_grad_kernel(int b, int d, int h, int w, int n, int c, int n_intervals,
const float *__restrict__ out_grad,
const int *__restrict__ geom_feats,
const int *__restrict__ interval_starts,
const int *__restrict__ interval_lengths,
float* __restrict__ x_grad) {
int idx = blockIdx.x * blockDim.x + threadIdx.x;
int index = idx / c;
int cur_c = idx % c;
if (index >= n_intervals) return;
int interval_start = interval_starts[index];
int interval_length = interval_lengths[index];
const int* cur_geom_feats = geom_feats + interval_start * 4;
float* cur_x_grad = x_grad + interval_start * c + cur_c;
const float* cur_out_grad = out_grad + cur_geom_feats[3] * d * h * w * c +
cur_geom_feats[2] * h * w * c + cur_geom_feats[0] * w * c +
cur_geom_feats[1] * c + cur_c;
for(int i = 0; i < interval_length; i++){
cur_x_grad[i * c] = *cur_out_grad;
}
}
void bev_pool(int b, int d, int h, int w, int n, int c, int n_intervals, const float* x,
const int* geom_feats, const int* interval_starts, const int* interval_lengths, float* out) {
bev_pool_kernel<<<(int)ceil(((double)n_intervals * c / 256)), 256>>>(
b, d, h, w, n, c, n_intervals, x, geom_feats, interval_starts, interval_lengths, out
);
}
void bev_pool_grad(int b, int d, int h, int w, int n, int c, int n_intervals, const float* out_grad,
const int* geom_feats, const int* interval_starts, const int* interval_lengths, float* x_grad) {
bev_pool_grad_kernel<<<(int)ceil(((double)n_intervals * c / 256)), 256>>>(
b, d, h, w, n, c, n_intervals, out_grad, geom_feats, interval_starts, interval_lengths, x_grad
);
}
......@@ -117,5 +117,13 @@ if __name__ == '__main__':
],
),
make_cuda_ext(
name="bev_pool_ext",
module="pcdet.ops.bev_pool",
sources=[
"src/bev_pool.cpp",
"src/bev_pool_cuda.cu",
],
),
],
)
CLASS_NAMES: ['car','truck', 'construction_vehicle', 'bus', 'trailer',
'barrier', 'motorcycle', 'bicycle', 'pedestrian', 'traffic_cone']
DATA_CONFIG:
_BASE_CONFIG_: cfgs/dataset_configs/nuscenes_dataset.yaml
POINT_CLOUD_RANGE: [-54.0, -54.0, -5.0, 54.0, 54.0, 3.0]
CAMERA_CONFIG:
USE_CAMERA: True
IMAGE:
FINAL_DIM: [256,704]
RESIZE_LIM_TRAIN: [0.38, 0.55]
RESIZE_LIM_TEST: [0.48, 0.48]
DATA_AUGMENTOR:
DISABLE_AUG_LIST: ['placeholder']
AUG_CONFIG_LIST:
- NAME: random_world_flip
ALONG_AXIS_LIST: ['x', 'y']
- NAME: random_world_rotation
WORLD_ROT_ANGLE: [-0.78539816, 0.78539816]
- NAME: random_world_scaling
WORLD_SCALE_RANGE: [0.9, 1.1]
- NAME: random_world_translation
NOISE_TRANSLATE_STD: [0.5, 0.5, 0.5]
- NAME: imgaug
ROT_LIM: [-5.4, 5.4]
RAND_FLIP: true
DATA_PROCESSOR:
- NAME: mask_points_and_boxes_outside_range
REMOVE_OUTSIDE_BOXES: True
- NAME: shuffle_points
SHUFFLE_ENABLED: {
'train': True,
'test': True
}
- NAME: transform_points_to_voxels
VOXEL_SIZE: [0.075, 0.075, 0.2]
MAX_POINTS_PER_VOXEL: 10
MAX_NUMBER_OF_VOXELS: {
'train': 120000,
'test': 160000
}
- NAME: image_calibrate
- NAME: image_normalize
mean: [0.485, 0.456, 0.406]
std: [0.229, 0.224, 0.225]
MODEL:
NAME: BevFusion
VFE:
NAME: MeanVFE
BACKBONE_3D:
NAME: VoxelResBackBone8x
USE_BIAS: False
MAP_TO_BEV:
NAME: HeightCompression
NUM_BEV_FEATURES: 256
IMAGE_BACKBONE:
NAME: SwinTransformer
EMBED_DIMS: 96
DEPTHS: [2, 2, 6, 2]
NUM_HEADS: [3, 6, 12, 24]
WINDOW_SIZE: 7
MLP_RATIO: 4
DROP_RATE: 0.
ATTN_DROP_RATE: 0.
DROP_PATH_RATE: 0.2
PATCH_NORM: True
OUT_INDICES: [1, 2, 3]
WITH_CP: False
CONVERT_WEIGHTS: True
INIT_CFG:
type: Pretrained
checkpoint: swint-nuimages-pretrained.pth
NECK:
NAME: GeneralizedLSSFPN
IN_CHANNELS: [192, 384, 768]
OUT_CHANNELS: 256
START_LEVEL: 0
END_LEVEL: -1
NUM_OUTS: 3
VTRANSFORM:
NAME: DepthLSSTransform
IMAGE_SIZE: [256, 704]
IN_CHANNEL: 256
OUT_CHANNEL: 80
FEATURE_SIZE: [32, 88]
XBOUND: [-54.0, 54.0, 0.3]
YBOUND: [-54.0, 54.0, 0.3]
ZBOUND: [-10.0, 10.0, 20.0]
DBOUND: [1.0, 60.0, 0.5]
DOWNSAMPLE: 2
FUSER:
NAME: 'ConvFuser'
IN_CHANNEL: 336
OUT_CHANNEL: 256
BACKBONE_2D:
NAME: BaseBEVBackbone
LAYER_NUMS: [5, 5]
LAYER_STRIDES: [1, 2]
NUM_FILTERS: [128, 256]
UPSAMPLE_STRIDES: [1, 2]
NUM_UPSAMPLE_FILTERS: [256, 256]
USE_CONV_FOR_NO_STRIDE: true
DENSE_HEAD:
CLASS_AGNOSTIC: False
NAME: TransFusionHead
USE_BIAS_BEFORE_NORM: False
NUM_PROPOSALS: 200
HIDDEN_CHANNEL: 128
NUM_CLASSES: 10
NUM_HEADS: 8
NMS_KERNEL_SIZE: 3
FFN_CHANNEL: 256
DROPOUT: 0.1
BN_MOMENTUM: 0.1
ACTIVATION: relu
NUM_HM_CONV: 2
SEPARATE_HEAD_CFG:
HEAD_ORDER: ['center', 'height', 'dim', 'rot', 'vel']
HEAD_DICT: {
'center': {'out_channels': 2, 'num_conv': 2},
'height': {'out_channels': 1, 'num_conv': 2},
'dim': {'out_channels': 3, 'num_conv': 2},
'rot': {'out_channels': 2, 'num_conv': 2},
'vel': {'out_channels': 2, 'num_conv': 2},
}
TARGET_ASSIGNER_CONFIG:
FEATURE_MAP_STRIDE: 8
DATASET: nuScenes
GAUSSIAN_OVERLAP: 0.1
MIN_RADIUS: 2
HUNGARIAN_ASSIGNER:
cls_cost: {'gamma': 2.0, 'alpha': 0.25, 'weight': 0.15}
reg_cost: {'weight': 0.25}
iou_cost: {'weight': 0.25}
LOSS_CONFIG:
LOSS_WEIGHTS: {
'cls_weight': 1.0,
'bbox_weight': 0.25,
'hm_weight': 1.0,
'code_weights': [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.2, 0.2]
}
LOSS_CLS:
use_sigmoid: true
gamma: 2.0
alpha: 0.25
POST_PROCESSING:
SCORE_THRESH: 0.0
POST_CENTER_RANGE: [-61.2, -61.2, -10.0, 61.2, 61.2, 10.0]
POST_PROCESSING:
RECALL_THRESH_LIST: [0.3, 0.5, 0.7]
SCORE_THRESH: 0.1
OUTPUT_RAW_SCORE: False
EVAL_METRIC: kitti
OPTIMIZATION:
BATCH_SIZE_PER_GPU: 3
NUM_EPOCHS: 6
OPTIMIZER: adam_cosineanneal
LR: 0.0001
WEIGHT_DECAY: 0.01
MOMENTUM: 0.9
BETAS: [0.9, 0.999]
MOMS: [0.9, 0.8052631]
PCT_START: 0.4
WARMUP_ITER: 500
DECAY_STEP_LIST: [35, 45]
LR_WARMUP: False
WARMUP_EPOCH: 1
GRAD_NORM_CLIP: 35
LOSS_SCALE_FP16: 32
\ No newline at end of file
......@@ -5,7 +5,7 @@ import torch.optim as optim
import torch.optim.lr_scheduler as lr_sched
from .fastai_optim import OptimWrapper
from .learning_schedules_fastai import CosineWarmupLR, OneCycle
from .learning_schedules_fastai import CosineWarmupLR, OneCycle, CosineAnnealing
def build_optimizer(model, optim_cfg):
......@@ -16,7 +16,7 @@ def build_optimizer(model, optim_cfg):
model.parameters(), lr=optim_cfg.LR, weight_decay=optim_cfg.WEIGHT_DECAY,
momentum=optim_cfg.MOMENTUM
)
elif optim_cfg.OPTIMIZER == 'adam_onecycle':
elif optim_cfg.OPTIMIZER in ['adam_onecycle','adam_cosineanneal']:
def children(m: nn.Module):
return list(m.children())
......@@ -52,6 +52,10 @@ def build_scheduler(optimizer, total_iters_each_epoch, total_epochs, last_epoch,
lr_scheduler = OneCycle(
optimizer, total_steps, optim_cfg.LR, list(optim_cfg.MOMS), optim_cfg.DIV_FACTOR, optim_cfg.PCT_START
)
elif optim_cfg.OPTIMIZER == 'adam_cosineanneal':
lr_scheduler = CosineAnnealing(
optimizer, total_steps, total_epochs, optim_cfg.LR, list(optim_cfg.MOMS), optim_cfg.PCT_START, optim_cfg.WARMUP_ITER
)
else:
lr_scheduler = lr_sched.LambdaLR(optimizer, lr_lbmd, last_epoch=last_epoch)
......
......@@ -41,7 +41,7 @@ class LRSchedulerStep(object):
self.mom_phases.append((int(start * total_step), total_step, lambda_func))
assert self.mom_phases[0][0] == 0
def step(self, step):
def step(self, step, epoch=None):
for start, end, func in self.lr_phases:
if step >= start:
self.optimizer.lr = func((step - start) / (end - start))
......@@ -83,12 +83,60 @@ class CosineWarmupLR(lr_sched._LRScheduler):
self.eta_min = eta_min
super(CosineWarmupLR, self).__init__(optimizer, last_epoch)
def get_lr(self):
def get_lr(self, epoch=None):
return [self.eta_min + (base_lr - self.eta_min) *
(1 - math.cos(math.pi * self.last_epoch / self.T_max)) / 2
for base_lr in self.base_lrs]
def linear_warmup(end, lr_max, pct):
k = (1 - pct / end) * (1 - 0.33333333)
warmup_lr = lr_max * (1 - k)
return warmup_lr
class CosineAnnealing(LRSchedulerStep):
def __init__(self, fai_optimizer, total_step, total_epoch, lr_max, moms, pct_start, warmup_iter):
self.lr_max = lr_max
self.moms = moms
self.pct_start = pct_start
mom_phases = ((0, partial(annealing_cos, *self.moms)),
(self.pct_start, partial(annealing_cos,
*self.moms[::-1])))
fai_optimizer.lr, fai_optimizer.mom = lr_max, self.moms[0]
self.optimizer = fai_optimizer
self.total_step = total_step
self.warmup_iter = warmup_iter
self.total_epoch = total_epoch
self.mom_phases = []
for i, (start, lambda_func) in enumerate(mom_phases):
if len(self.mom_phases) != 0:
assert self.mom_phases[-1][0] < start
if isinstance(lambda_func, str):
lambda_func = eval(lambda_func)
if i < len(mom_phases) - 1:
self.mom_phases.append((int(start * total_step), int(mom_phases[i + 1][0] * total_step), lambda_func))
else:
self.mom_phases.append((int(start * total_step), total_step, lambda_func))
assert self.mom_phases[0][0] == 0
def step(self, step, epoch):
# update lr
if step < self.warmup_iter:
self.optimizer.lr = linear_warmup(self.warmup_iter, self.lr_max, step)
else:
target_lr = self.lr_max * 0.001
cos_lr = annealing_cos(self.lr_max, target_lr, epoch / self.total_epoch)
self.optimizer.lr = cos_lr
# update mom
for start, end, func in self.mom_phases:
if step >= start:
self.optimizer.mom = func((step - start) / (end - start))
class FakeOptim:
def __init__(self):
self.lr = 0
......
......@@ -39,7 +39,7 @@ def train_one_epoch(model, optimizer, train_loader, model_func, lr_scheduler, ac
data_timer = time.time()
cur_data_time = data_timer - end
lr_scheduler.step(accumulated_iter)
lr_scheduler.step(accumulated_iter, cur_epoch)
try:
cur_lr = float(optimizer.lr)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment