"components/metrics/src/main.rs" did not exist on "494d56255a94f3546558bfe84c35d38b3ffcfed1"
Unverified Commit d3208987 authored by Wenhai Wang's avatar Wenhai Wang Committed by GitHub
Browse files

Merge branch 'master' into openlane

parents 2341b283 198ca8f9
from .loading import LoadMultiViewImagesFromFiles
from .formating import FormatBundleMap
from .transform import ResizeMultiViewImages, PadMultiViewImages, Normalize3D
from .vectorize import VectorizeMap
from .poly_bbox import PolygonizeLocalMapBbox
# for argoverse
__all__ = [
'LoadMultiViewImagesFromFiles',
'FormatBundleMap', 'Normalize3D', 'ResizeMultiViewImages', 'PadMultiViewImages',
'VectorizeMap', 'PolygonizeLocalMapBbox'
]
\ No newline at end of file
import numpy as np
from mmcv.parallel import DataContainer as DC
from mmdet3d.core.points import BasePoints
from mmdet.datasets.builder import PIPELINES
from mmdet.datasets.pipelines import to_tensor
@PIPELINES.register_module()
class FormatBundleMap(object):
"""Format data for map tasks and then collect data for model input.
These fields are formatted as follows.
- img: (1) transpose, (2) to tensor, (3) to DataContainer (stack=True)
- semantic_mask (if exists): (1) to tensor, (2) to DataContainer (stack=True)
- vectors (if exists): (1) to DataContainer (cpu_only=True)
- img_metas: (1) to DataContainer (cpu_only=True)
"""
def __init__(self, process_img=True,
keys=['img', 'semantic_mask', 'vectors'],
meta_keys=['intrinsics', 'extrinsics']):
self.process_img = process_img
self.keys = keys
self.meta_keys = meta_keys
def __call__(self, results):
"""Call function to transform and format common fields in results.
Args:
results (dict): Result dict contains the data to convert.
Returns:
dict: The result dict contains the data that is formatted with
default bundle.
"""
# Format 3D data
if 'points' in results:
assert isinstance(results['points'], BasePoints)
results['points'] = DC(results['points'].tensor)
for key in ['voxels', 'coors', 'voxel_centers', 'num_points']:
if key not in results:
continue
results[key] = DC(to_tensor(results[key]), stack=False)
if 'img' in results and self.process_img:
if isinstance(results['img'], list):
# process multiple imgs in single frame
imgs = [img.transpose(2, 0, 1) for img in results['img']]
imgs = np.ascontiguousarray(np.stack(imgs, axis=0))
results['img'] = DC(to_tensor(imgs), stack=True)
else:
img = np.ascontiguousarray(results['img'].transpose(2, 0, 1))
results['img'] = DC(to_tensor(img), stack=True)
if 'semantic_mask' in results:
results['semantic_mask'] = DC(to_tensor(results['semantic_mask']), stack=True)
if 'vectors' in results:
# vectors may have different sizes
vectors = results['vectors']
results['vectors'] = DC(vectors, stack=False, cpu_only=True)
if 'polys' in results:
results['polys'] = DC(results['polys'], stack=False, cpu_only=True)
return results
def __repr__(self):
"""str: Return a string that describes the module."""
repr_str = self.__class__.__name__
repr_str += f'(process_img={self.process_img}, '
return repr_str
import mmcv
import numpy as np
from mmdet.datasets.builder import PIPELINES
@PIPELINES.register_module(force=True)
class LoadMultiViewImagesFromFiles(object):
"""Load multi channel images from a list of separate channel files.
Expects results['img_filename'] to be a list of filenames.
Args:
to_float32 (bool): Whether to convert the img to float32.
Defaults to False.
color_type (str): Color type of the file. Defaults to 'unchanged'.
"""
def __init__(self, to_float32=False, color_type='unchanged'):
self.to_float32 = to_float32
self.color_type = color_type
def __call__(self, results):
"""Call function to load multi-view image from files.
Args:
results (dict): Result dict containing multi-view image filenames.
Returns:
dict: The result dict containing the multi-view image data. \
Added keys and values are described below.
- filename (str): Multi-view image filenames.
- img (np.ndarray): Multi-view image arrays.
- img_shape (tuple[int]): Shape of multi-view image arrays.
- ori_shape (tuple[int]): Shape of original image arrays.
- pad_shape (tuple[int]): Shape of padded image arrays.
- scale_factor (float): Scale factor.
- img_norm_cfg (dict): Normalization configuration of images.
"""
filename = results['img_filenames']
img = [mmcv.imread(name, self.color_type) for name in filename]
if self.to_float32:
img = [i.astype(np.float32) for i in img]
results['img'] = img
results['img_shape'] = [i.shape for i in img]
results['ori_shape'] = [i.shape for i in img]
# Set initial values for default meta_keys
results['pad_shape'] = [i.shape for i in img]
# results['scale_factor'] = 1.0
num_channels = 1 if len(img[0].shape) < 3 else img[0].shape[2]
results['img_norm_cfg'] = dict(
mean=np.zeros(num_channels, dtype=np.float32),
std=np.ones(num_channels, dtype=np.float32),
to_rgb=False)
results['img_fields'] = ['img']
return results
def __repr__(self):
"""str: Return a string that describes the module."""
return f'{self.__class__.__name__} (to_float32={self.to_float32}, '\
f"color_type='{self.color_type}')"
import numpy as np
from mmdet.datasets.builder import PIPELINES
from shapely.geometry import LineString
@PIPELINES.register_module(force=True)
class PolygonizeLocalMapBbox(object):
"""Pre-Processing used by vectormapnet model.
Args:
canvas_size (tuple or list): bev feature size
coord_dim (int): dimension of point's coordinate
num_class (int): number of classes
threshold (float): threshold for minimum bounding box size
"""
def __init__(self,
canvas_size=(200, 100),
coord_dim=2,
num_class=3,
threshold=6/200,
):
self.canvas_size = np.array(canvas_size)
self.num_class = num_class
# for keypoints
self.threshold = threshold
self.coord_dim = coord_dim
self.map_stop_idx = 0
self.coord_dim_start_idx = 1
def format_polyline_map(self, vectors):
polylines, polyline_masks, polyline_weights = [], [], []
# quantilize each label's lines individually.
for label, _lines in vectors.items():
for polyline in _lines:
# and pad polyline.
if label == 2:
polyline_weight = evaluate_line(polyline).reshape(-1)
else:
polyline_weight = np.ones_like(polyline).reshape(-1)
polyline_weight = np.pad(
polyline_weight, ((0, 1),), constant_values=1.)
polyline_weight = polyline_weight/polyline_weight.sum()
# flatten and quantilized
fpolyline = quantize_verts(
polyline, self.canvas_size, self.coord_dim)
fpolyline = fpolyline.reshape(-1)
# reindex starting from 1, and add a zero stopping token(EOS),
fpolyline = \
np.pad(fpolyline + self.coord_dim_start_idx, ((0, 1),),
constant_values=0)
fpolyline_msk = np.ones(fpolyline.shape, dtype=np.bool)
polyline_masks.append(fpolyline_msk)
polyline_weights.append(polyline_weight)
polylines.append(fpolyline)
polyline_map = polylines
polyline_map_mask = polyline_masks
polyline_map_weights = polyline_weights
return polyline_map, polyline_map_mask, polyline_map_weights
def format_keypoint(self, vectors):
kps, kp_labels = [], []
qkps, qkp_masks = [], []
# quantilize each label's lines individually.
for label, _lines in vectors.items():
for polyline in _lines:
kp = get_bbox(polyline, self.threshold)
kps.append(kp)
kp_labels.append(label)
gkp = kp
# flatten and quantilized
fkp = quantize_verts(gkp, self.canvas_size, self.coord_dim)
fkp = fkp.reshape(-1)
fkps_msk = np.ones(fkp.shape, dtype=np.bool)
qkp_masks.append(fkps_msk)
qkps.append(fkp)
qkps = np.stack(qkps)
qkp_msks = np.stack(qkp_masks)
# format det
kps = np.stack(kps, axis=0).astype(np.float32)*self.canvas_size
kp_labels = np.array(kp_labels)
# restrict the boundary
kps[..., 0] = np.clip(kps[..., 0], 0.1, self.canvas_size[0]-0.1)
kps[..., 1] = np.clip(kps[..., 1], 0.1, self.canvas_size[1]-0.1)
# nbox, boxsize(4)*coord_dim(2)
kps = kps.reshape(kps.shape[0], -1)
# unflatten_seq(qkps)
return kps, kp_labels, qkps, qkp_msks,
def Polygonization(self, input_dict):
'''
Process vertices.
'''
vectors = input_dict['vectors']
n_lines = 0
for label, lines in vectors.items():
n_lines += len(lines)
if not n_lines:
input_dict['polys'] = []
return input_dict
polyline_map, polyline_map_mask, polyline_map_weight = \
self.format_polyline_map(vectors)
keypoint, keypoint_label, qkeypoint, qkeypoint_mask = \
self.format_keypoint(vectors)
# gather
polys = {
# for det
'keypoint': keypoint,
'det_label': keypoint_label,
# for gen
'gen_label': keypoint_label,
'qkeypoint': qkeypoint,
'qkeypoint_mask': qkeypoint_mask,
'polylines': polyline_map, # List[array]
'polyline_masks': polyline_map_mask, # List[array]
'polyline_weights': polyline_map_weight
}
# Format outputs
input_dict['polys'] = polys
return input_dict
def __call__(self, input_dict):
input_dict = self.Polygonization(input_dict)
return input_dict
def evaluate_line(polyline):
edge = np.linalg.norm(polyline[1:] - polyline[:-1], axis=-1)
start_end_weight = edge[(0, -1), ].copy()
mid_weight = (edge[:-1] + edge[1:]) * .5
pts_weight = np.concatenate(
(start_end_weight[:1], mid_weight, start_end_weight[-1:]))
denominator = pts_weight.sum()
denominator = 1 if denominator == 0 else denominator
pts_weight /= denominator
# add weights for stop index
pts_weight = np.repeat(pts_weight, 2)/2
pts_weight = np.pad(pts_weight, ((0, 1)),
constant_values=1/(len(polyline)*2))
return pts_weight
def quantize_verts(verts, canvas_size, coord_dim):
"""Convert vertices from its original range ([-1,1]) to discrete values in [0, n_bits**2 - 1].
Args:
verts (array): vertices coordinates, shape (seqlen, coords_dim)
canvas_size (tuple): bev feature size
coord_dim (int): dimension of point coordinates
Returns:
quantized_verts (array): quantized vertices, shape (seqlen, coords_dim)
"""
min_range = 0
max_range = 1
range_quantize = np.array(canvas_size) - 1 # (0-199) = 200
verts_ratio = (verts[:, :coord_dim] - min_range) / (
max_range - min_range)
verts_quantize = verts_ratio * range_quantize[:coord_dim]
return verts_quantize.astype('int32')
def get_bbox(polyline, threshold):
"""Convert vertices from its original range ([-1,1]) to discrete values in [0, n_bits**2 - 1].
Args:
polyline (array): point coordinates, shape (seqlen, 2)
threshold (float): threshold for minimum bbox size
Returns:
bbox (array): bounding box in xyxy format, shape (2, 2)
"""
eps = 1e-4
polyline = LineString(polyline)
bbox = polyline.bounds
minx, miny, maxx, maxy = bbox
W, H = maxx-minx, maxy-miny
if W < threshold or H < threshold:
remain = max((threshold - min(W, H))/2, eps)
bbox = polyline.buffer(remain).envelope.bounds
minx, miny, maxx, maxy = bbox
bbox_np = np.array([[minx, miny], [maxx, maxy]])
bbox_np = np.clip(bbox_np, 0., 1.)
return bbox_np
\ No newline at end of file
import numpy as np
import mmcv
from mmdet.datasets.builder import PIPELINES
@PIPELINES.register_module(force=True)
class Normalize3D(object):
"""Normalize the image.
Added key is "img_norm_cfg".
Args:
mean (sequence): Mean values of 3 channels.
std (sequence): Std values of 3 channels.
to_rgb (bool): Whether to convert the image from BGR to RGB,
default is true.
"""
def __init__(self, mean, std, to_rgb=True):
self.mean = np.array(mean, dtype=np.float32)
self.std = np.array(std, dtype=np.float32)
self.to_rgb = to_rgb
def __call__(self, results):
"""Call function to normalize images.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Normalized results, 'img_norm_cfg' key is added into
result dict.
"""
for key in results.get('img_fields', ['img']):
results[key] = [mmcv.imnormalize(
img, self.mean, self.std, self.to_rgb) for img in results[key]]
results['img_norm_cfg'] = dict(
mean=self.mean, std=self.std, to_rgb=self.to_rgb)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(mean={self.mean}, std={self.std}, to_rgb={self.to_rgb})'
return repr_str
@PIPELINES.register_module(force=True)
class PadMultiViewImages(object):
"""Pad multi-view images and change intrinsics
There are two padding modes: (1) pad to a fixed size and (2) pad to the
minimum size that is divisible by some number.
Added keys are "pad_shape", "pad_fixed_size", "pad_size_divisor",
If set `change_intrinsics=True`, key 'cam_intrinsics' and 'ego2img' will be changed.
Args:
size (tuple, optional): Fixed padding size, (h, w).
size_divisor (int, optional): The divisor of padded size.
pad_val (float, optional): Padding value, 0 by default.
change_intrinsics (bool): whether to update intrinsics.
"""
def __init__(self, size=None, size_divisor=None, pad_val=0, change_intrinsics=False):
self.size = size
self.size_divisor = size_divisor
self.pad_val = pad_val
# only one of size and size_divisor should be valid
assert size is not None or size_divisor is not None
assert size is None or size_divisor is None
self.change_intrinsics = change_intrinsics
def _pad_img(self, results):
"""Pad images according to ``self.size``."""
original_shape = [img.shape for img in results['img']]
for key in results.get('img_fields', ['img']):
if self.size is not None:
padded_img = [mmcv.impad(
img, shape=self.size, pad_val=self.pad_val) for img in results[key]]
elif self.size_divisor is not None:
padded_img = [mmcv.impad_to_multiple(
img, self.size_divisor, pad_val=self.pad_val) for img in results[key]]
results[key] = padded_img
if self.change_intrinsics:
post_intrinsics, post_ego2imgs = [], []
for img, oshape, cam_intrinsic, ego2img in zip(results['img'], \
original_shape, results['cam_intrinsics'], results['ego2img']):
scaleW = img.shape[1] / oshape[1]
scaleH = img.shape[0] / oshape[0]
rot_resize_matrix = np.array([
[scaleW, 0, 0, 0],
[0, scaleH, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]])
post_intrinsic = rot_resize_matrix[:3, :3] @ cam_intrinsic
post_ego2img = rot_resize_matrix @ ego2img
post_intrinsics.append(post_intrinsic)
post_ego2imgs.append(post_ego2img)
results.update({
'cam_intrinsics': post_intrinsics,
'ego2img': post_ego2imgs,
})
results['img_shape'] = [img.shape for img in padded_img]
results['img_fixed_size'] = self.size
results['img_size_divisor'] = self.size_divisor
def __call__(self, results):
"""Call function to pad images, masks, semantic segmentation maps.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Updated result dict.
"""
self._pad_img(results)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(size={self.size}, '
repr_str += f'size_divisor={self.size_divisor}, '
repr_str += f'pad_val={self.pad_val})'
repr_str += f'change_intrinsics={self.change_intrinsics})'
return repr_str
@PIPELINES.register_module(force=True)
class ResizeMultiViewImages(object):
"""Resize mulit-view images and change intrinsics
If set `change_intrinsics=True`, key 'cam_intrinsics' and 'ego2img' will be changed
Args:
size (tuple, optional): resize target size, (h, w).
change_intrinsics (bool): whether to update intrinsics.
"""
def __init__(self, size, change_intrinsics=True):
self.size = size
self.change_intrinsics = change_intrinsics
def __call__(self, results:dict):
new_imgs, post_intrinsics, post_ego2imgs = [], [], []
for img, cam_intrinsic, ego2img in zip(results['img'], \
results['cam_intrinsics'], results['ego2img']):
tmp, scaleW, scaleH = mmcv.imresize(img,
# NOTE: mmcv.imresize expect (w, h) shape
(self.size[1], self.size[0]),
return_scale=True)
new_imgs.append(tmp)
rot_resize_matrix = np.array([
[scaleW, 0, 0, 0],
[0, scaleH, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]])
post_intrinsic = rot_resize_matrix[:3, :3] @ cam_intrinsic
post_ego2img = rot_resize_matrix @ ego2img
post_intrinsics.append(post_intrinsic)
post_ego2imgs.append(post_ego2img)
results['img'] = new_imgs
results['img_shape'] = [img.shape for img in new_imgs]
if self.change_intrinsics:
results.update({
'cam_intrinsics': post_intrinsics,
'ego2img': post_ego2imgs,
})
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(size={self.size}, '
repr_str += f'change_intrinsics={self.change_intrinsics})'
return repr_str
\ No newline at end of file
import numpy as np
from mmdet.datasets.builder import PIPELINES
from shapely.geometry import LineString
from numpy.typing import NDArray
from typing import List, Tuple, Union, Dict
@PIPELINES.register_module(force=True)
class VectorizeMap(object):
"""Generate vectoized map and put into `semantic_mask` key.
Concretely, shapely geometry objects are converted into sample points (ndarray).
We use args `sample_num`, `sample_dist`, `simplify` to specify sampling method.
Args:
roi_size (tuple or list): bev range .
normalize (bool): whether to normalize points to range (0, 1).
coords_dim (int): dimension of point coordinates.
simplify (bool): whether to use simpily function. If true, `sample_num` \
and `sample_dist` will be ignored.
sample_num (int): number of points to interpolate from a polyline. Set to -1 to ignore.
sample_dist (float): interpolate distance. Set to -1 to ignore.
"""
def __init__(self,
roi_size: Union[Tuple, List],
normalize: bool,
coords_dim: int,
simplify: bool=False,
sample_num: int=-1,
sample_dist: float=-1,
):
self.coords_dim = coords_dim
self.sample_num = sample_num
self.sample_dist = sample_dist
self.roi_size = np.array(roi_size)
self.normalize = normalize
self.simplify = simplify
self.sample_fn = None
if sample_dist > 0:
assert sample_num < 0 and not simplify
self.sample_fn = self.interp_fixed_dist
if sample_num > 0:
assert sample_dist < 0 and not simplify
self.sample_fn = self.interp_fixed_num
def interp_fixed_num(self, line: LineString) -> NDArray:
''' Interpolate a line to fixed number of points.
Args:
line (LineString): line
Returns:
points (array): interpolated points, shape (N, 2)
'''
distances = np.linspace(0, line.length, self.sample_num)
sampled_points = np.array([list(line.interpolate(distance).coords)
for distance in distances]).squeeze()
return sampled_points
def interp_fixed_dist(self, line: LineString) -> NDArray:
''' Interpolate a line at fixed interval.
Args:
line (LineString): line
Returns:
points (array): interpolated points, shape (N, 2)
'''
distances = list(np.arange(self.sample_dist, line.length, self.sample_dist))
# make sure to sample at least two points when sample_dist > line.length
distances = [0,] + distances + [line.length,]
sampled_points = np.array([list(line.interpolate(distance).coords)
for distance in distances]).squeeze()
return sampled_points
def get_vectorized_lines(self, map_geoms: Dict) -> Dict:
''' Vectorize map elements. Iterate over the input dict and apply the
specified sample funcion.
Args:
line (LineString): line
Returns:
vectors (array): dict of vectorized map elements.
'''
vectors = {}
for label, geom_list in map_geoms.items():
vectors[label] = []
for geom in geom_list:
if geom.geom_type == 'LineString':
geom = LineString(np.array(geom.coords)[:, :self.coords_dim])
if self.simplify:
line = geom.simplify(0.2, preserve_topology=True)
line = np.array(line.coords)
elif self.sample_fn:
line = self.sample_fn(geom)
else:
line = np.array(line.coords)
if self.normalize:
line = self.normalize_line(line)
vectors[label].append(line)
elif geom.geom_type == 'Polygon':
# polygon objects will not be vectorized
continue
else:
raise ValueError('map geoms must be either LineString or Polygon!')
return vectors
def normalize_line(self, line: NDArray) -> NDArray:
''' Convert points to range (0, 1).
Args:
line (LineString): line
Returns:
normalized (array): normalized points.
'''
origin = -np.array([self.roi_size[0]/2, self.roi_size[1]/2])
line[:, :2] = line[:, :2] - origin
# transform from range [0, 1] to (0, 1)
eps = 2
line[:, :2] = line[:, :2] / (self.roi_size + eps)
return line
def __call__(self, input_dict):
map_geoms = input_dict['map_geoms']
input_dict['vectors'] = self.get_vectorized_lines(map_geoms)
return input_dict
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(simplify={self.simplify}, '
repr_str += f'sample_num={self.sample_num}), '
repr_str += f'sample_dist={self.sample_dist}), '
repr_str += f'roi_size={self.roi_size})'
repr_str += f'normalize={self.normalize})'
repr_str += f'coords_dim={self.coords_dim})'
return repr_str
\ No newline at end of file
from .backbones import *
from .heads import *
from .losses import *
from .mapers import *
from .transformer_utils import *
from .assigner import *
from .assigner import HungarianLinesAssigner
from .match_cost import MapQueriesCost, BBoxLogitsCost, DynamicLinesCost, IoUCostC, BBoxCostC, LinesCost, LinesFixNumChamferCost, ClsSigmoidCost
import torch
from mmdet.core.bbox.builder import BBOX_ASSIGNERS
from mmdet.core.bbox.assigners import AssignResult
from mmdet.core.bbox.assigners import BaseAssigner
from mmdet.core.bbox.match_costs import build_match_cost
try:
from scipy.optimize import linear_sum_assignment
except ImportError:
linear_sum_assignment = None
@BBOX_ASSIGNERS.register_module()
class HungarianLinesAssigner(BaseAssigner):
"""
Computes one-to-one matching between predictions and ground truth.
This class computes an assignment between the targets and the predictions
based on the costs. The costs are weighted sum of three components:
classification cost and regression L1 cost. The
targets don't include the no_object, so generally there are more
predictions than targets. After the one-to-one matching, the un-matched
are treated as backgrounds. Thus each query prediction will be assigned
with `0` or a positive integer indicating the ground truth index:
- 0: negative sample, no assigned gt
- positive integer: positive sample, index (1-based) of assigned gt
Args:
cls_weight (int | float, optional): The scale factor for classification
cost. Default 1.0.
bbox_weight (int | float, optional): The scale factor for regression
L1 cost. Default 1.0.
"""
def __init__(self,
cost=dict(
type='MapQueriesCost',
cls_cost=dict(type='ClassificationCost', weight=1.),
reg_cost=dict(type='LinesCost', weight=1.0),
),
pc_range=None,
**kwargs):
self.pc_range = pc_range
self.cost = build_match_cost(cost)
def assign(self,
preds: dict,
gts: dict,
gt_bboxes_ignore=None,
eps=1e-7):
"""
Computes one-to-one matching based on the weighted costs.
This method assign each query prediction to a ground truth or
background. The `assigned_gt_inds` with -1 means don't care,
0 means negative sample, and positive number is the index (1-based)
of assigned gt.
The assignment is done in the following steps, the order matters.
1. assign every prediction to -1
2. compute the weighted costs
3. do Hungarian matching on CPU based on the costs
4. assign all to 0 (background) first, then for each matched pair
between predictions and gts, treat this prediction as foreground
and assign the corresponding gt index (plus 1) to it.
Args:
lines_pred (Tensor): predicted normalized lines:
[num_query, num_points, 2]
cls_pred (Tensor): Predicted classification logits, shape
[num_query, num_class].
Note: when compute bbox l1 loss, velocity is not included!!
lines_gt (Tensor): Ground truth lines
[num_gt, num_points, 2].
labels_gt (Tensor): Label of `gt_bboxes`, shape (num_gt,).
gt_bboxes_ignore (Tensor, optional): Ground truth bboxes that are
labelled as `ignored`. Default None.
eps (int | float, optional): A value added to the denominator for
numerical stability. Default 1e-7.
Returns:
:obj:`AssignResult`: The assigned result.
"""
assert gt_bboxes_ignore is None, \
'Only case when gt_bboxes_ignore is None is supported.'
num_gts, num_lines = gts['lines'].size(0), preds['lines'].size(0)
# 1. assign -1 by default
assigned_gt_inds = \
preds['lines'].new_full((num_lines,), -1, dtype=torch.long)
assigned_labels = \
preds['lines'].new_full((num_lines,), -1, dtype=torch.long)
if num_gts == 0 or num_lines == 0:
# No ground truth or boxes, return empty assignment
if num_gts == 0:
# No ground truth, assign all to background
assigned_gt_inds[:] = 0
return AssignResult(
num_gts, assigned_gt_inds, None, labels=assigned_labels)
# 2. compute the weighted costs
cost = self.cost(preds, gts)
# 3. do Hungarian matching on CPU using linear_sum_assignment
cost = cost.detach().cpu().numpy()
if linear_sum_assignment is None:
raise ImportError('Please run "pip install scipy" '
'to install scipy first.')
try:
matched_row_inds, matched_col_inds = linear_sum_assignment(cost)
except:
print('cost max{}, min{}'.format(cost.max(), cost.min()))
import ipdb; ipdb.set_trace()
matched_row_inds = torch.from_numpy(matched_row_inds).to(
preds['lines'].device)
matched_col_inds = torch.from_numpy(matched_col_inds).to(
preds['lines'].device)
# 4. assign backgrounds and foregrounds
# assign all indices to backgrounds first
assigned_gt_inds[:] = 0
# assign foregrounds based on matching results
assigned_gt_inds[matched_row_inds] = matched_col_inds + 1
assigned_labels[matched_row_inds] = gts['labels'][matched_col_inds]
return AssignResult(
num_gts, assigned_gt_inds, None, labels=assigned_labels)
\ No newline at end of file
import torch
from mmdet.core.bbox.match_costs.builder import MATCH_COST
from mmdet.core.bbox.match_costs import build_match_cost
from mmdet.core.bbox.iou_calculators import bbox_overlaps
from mmdet.core.bbox.transforms import bbox_cxcywh_to_xyxy
def chamfer_distance(pred, gt):
'''
Args:
pred: [num_points, 2]
gt: [num_gt, 2]
Out: torch.FloatTensor of shape (1, )
'''
# [num_points, num_gt]
dist_mat = torch.cdist(pred, gt, p=2)
# [num_points]
dist_pred, _ = torch.min(dist_mat, dim=-1)
dist_pred = torch.clamp(dist_pred, max=2.0)
dist_pred = dist_pred.mean()
dist_gt, _ = torch.min(dist_mat, dim=0)
dist_gt = torch.clamp(dist_gt, max=2.0)
dist_gt = dist_gt.mean()
dist = dist_pred + dist_gt
return dist
@MATCH_COST.register_module()
class ClsSigmoidCost:
"""ClsSoftmaxCost.
Args:
weight (int | float, optional): loss_weight
"""
def __init__(self, weight=1.):
self.weight = weight
def __call__(self, cls_pred, gt_labels):
"""
Args:
cls_pred (Tensor): Predicted classification logits, shape
[num_query, num_class].
gt_labels (Tensor): Label of `gt_bboxes`, shape (num_gt,).
Returns:
torch.Tensor: cls_cost value with weight
"""
# Following the official DETR repo, contrary to the loss that
# NLL is used, we approximate it in 1 - cls_score[gt_label].
# The 1 is a constant that doesn't change the matching,
# so it can be omitted.
cls_score = cls_pred.sigmoid()
cls_cost = -cls_score[:, gt_labels]
return cls_cost * self.weight
@MATCH_COST.register_module()
class LinesFixNumChamferCost(object):
"""BBox3DL1Cost.
Args:
weight (int | float, optional): loss_weight
"""
def __init__(self, weight=1.):
self.weight = weight
def __call__(self, lines_pred, gt_lines):
"""
Args:
lines_pred (Tensor): predicted normalized lines:
[num_query, num_points, 2]
gt_lines (Tensor): Ground truth lines
[num_gt, num_points, 2]
Returns:
torch.Tensor: reg_cost value with weight
shape [num_pred, num_gt]
"""
num_gts, num_bboxes = gt_lines.size(0), lines_pred.size(0)
dist_mat = lines_pred.new_full((num_bboxes, num_gts),
1.0,)
for i in range(num_bboxes):
for j in range(num_gts):
dist_mat[i, j] = chamfer_distance(
lines_pred[i], gt_lines[j])
return dist_mat * self.weight
@MATCH_COST.register_module()
class LinesCost(object):
"""LinesL1Cost.
Args:
weight (int | float, optional): loss_weight
"""
def __init__(self, weight=1.):
self.weight = weight
def __call__(self, lines_pred, gt_lines, **kwargs):
"""
Args:
lines_pred (Tensor): predicted normalized lines:
[num_query, num_points, 2]
gt_lines (Tensor): Ground truth lines
[num_gt, num_points, 2]
Returns:
torch.Tensor: reg_cost value with weight
shape [num_pred, num_gt]
"""
gt_revser = torch.flip(gt_lines, dims=[-2])
gt_revser_flat = gt_revser.flatten(1, 2)
pred_flat = lines_pred.flatten(1, 2)
gt_flat = gt_lines.flatten(1, 2)
div_ = pred_flat.size(-1)
dist_mat = torch.cdist(pred_flat, gt_flat, p=1) / div_
return dist_mat * self.weight
@MATCH_COST.register_module()
class BBoxCostC:
"""BBoxL1Cost.
Args:
weight (int | float, optional): loss_weight
box_format (str, optional): 'xyxy' for DETR, 'xywh' for Sparse_RCNN
Examples:
>>> from mmdet.core.bbox.match_costs.match_cost import BBoxL1Cost
>>> import torch
>>> self = BBoxL1Cost()
>>> bbox_pred = torch.rand(1, 4)
>>> gt_bboxes= torch.FloatTensor([[0, 0, 2, 4], [1, 2, 3, 4]])
>>> factor = torch.tensor([10, 8, 10, 8])
>>> self(bbox_pred, gt_bboxes, factor)
tensor([[1.6172, 1.6422]])
"""
def __init__(self, weight=1., box_format='xyxy'):
self.weight = weight
assert box_format in ['xyxy', 'xywh']
self.box_format = box_format
def __call__(self, bbox_pred, gt_bboxes):
"""
Args:
bbox_pred (Tensor): Predicted boxes with normalized coordinates
(cx, cy, w, h), which are all in range [0, 1]. Shape
[num_query, 4].
gt_bboxes (Tensor): Ground truth boxes with normalized
coordinates (x1, y1, x2, y2). Shape [num_gt, 4].
Returns:
torch.Tensor: bbox_cost value with weight
"""
# if self.box_format == 'xywh':
# gt_bboxes = bbox_xyxy_to_cxcywh(gt_bboxes)
# elif self.box_format == 'xyxy':
# bbox_pred = bbox_cxcywh_to_xyxy(bbox_pred)
bbox_cost = torch.cdist(bbox_pred, gt_bboxes, p=1)
return bbox_cost * self.weight
@MATCH_COST.register_module()
class IoUCostC:
"""IoUCost.
Args:
iou_mode (str, optional): iou mode such as 'iou' | 'giou'
weight (int | float, optional): loss weight
Examples:
>>> from mmdet.core.bbox.match_costs.match_cost import IoUCost
>>> import torch
>>> self = IoUCost()
>>> bboxes = torch.FloatTensor([[1,1, 2, 2], [2, 2, 3, 4]])
>>> gt_bboxes = torch.FloatTensor([[0, 0, 2, 4], [1, 2, 3, 4]])
>>> self(bboxes, gt_bboxes)
tensor([[-0.1250, 0.1667],
[ 0.1667, -0.5000]])
"""
def __init__(self, iou_mode='giou', weight=1., box_format='xywh'):
self.weight = weight
self.iou_mode = iou_mode
assert box_format in ['xyxy', 'xywh']
self.box_format = box_format
def __call__(self, bboxes, gt_bboxes):
"""
Args:
bboxes (Tensor): Predicted boxes with unnormalized coordinates
(x1, y1, x2, y2). Shape [num_query, 4].
gt_bboxes (Tensor): Ground truth boxes with unnormalized
coordinates (x1, y1, x2, y2). Shape [num_gt, 4].
Returns:
torch.Tensor: iou_cost value with weight
"""
if self.box_format == 'xywh':
bboxes = bbox_cxcywh_to_xyxy(bboxes)
gt_bboxes = bbox_cxcywh_to_xyxy(gt_bboxes)
# overlaps: [num_bboxes, num_gt]
overlaps = bbox_overlaps(
bboxes, gt_bboxes, mode=self.iou_mode, is_aligned=False)
# The 1 is a constant that doesn't change the matching, so omitted.
iou_cost = -overlaps
return iou_cost * self.weight
@MATCH_COST.register_module()
class DynamicLinesCost(object):
"""LinesL1Cost.
Args:
weight (int | float, optional): loss_weight
"""
def __init__(self, weight=1.):
self.weight = weight
def __call__(self, lines_pred, lines_gt, masks_pred, masks_gt):
"""
Args:
lines_pred (Tensor): predicted normalized lines:
[nP, num_points, 2]
lines_gt (Tensor): Ground truth lines
[nG, num_points, 2]
masks_pred: [nP, num_points]
masks_gt: [nG, num_points]
Returns:
dist_mat: reg_cost value with weight
shape [nP, nG]
"""
dist_mat = self.cal_dist(lines_pred, lines_gt)
dist_mat = self.get_dynamic_line(dist_mat, masks_pred, masks_gt)
dist_mat = dist_mat * self.weight
return dist_mat
def cal_dist(self, x1, x2):
'''
Args:
x1: B1,N,2
x2: B2,N,2
Return:
dist_mat: B1,B2,N
'''
x1 = x1.permute(1, 0, 2)
x2 = x2.permute(1, 0, 2)
dist_mat = torch.cdist(x1, x2, p=2)
dist_mat = dist_mat.permute(1, 2, 0)
return dist_mat
def get_dynamic_line(self, mat, m1, m2):
'''
get dynamic line with difference approach
mat: N1xN2xnpts
m1: N1xnpts
m2: N2xnpts
'''
# nPxnGxnum_points
m1 = m1.unsqueeze(1).sigmoid() > 0.5
m2 = m2.unsqueeze(0)
valid_points_mask = (m1 + m2)/2.
average_factor_mask = valid_points_mask.sum(-1) > 0
average_factor = average_factor_mask.masked_fill(
~average_factor_mask, 1)
# takes the average
mat = mat * valid_points_mask
mat = mat.sum(-1) / average_factor
return mat
@MATCH_COST.register_module()
class BBoxLogitsCost(object):
"""BBoxLogits.
Args:
weight (int | float, optional): loss_weight
"""
def __init__(self, weight=1.):
self.weight = weight
def calNLL(self, logits, value):
'''
Args:
logits: B1, 8, cls_dim
value: B2, 8,
Return:
log_likelihood: B1,B2,8
'''
logits = logits[:, None]
value = value[None]
value = value.long().unsqueeze(-1)
value, log_pmf = torch.broadcast_tensors(value, logits)
value = value[..., :1]
return log_pmf.gather(-1, value).squeeze(-1)
def __call__(self, bbox_pred, bbox_gt, **kwargs):
"""
Args:
bbox_pred: nproposal, 4*2, pos_dim
bbox_gt: ngt, 4*2
Returns:
cost: nproposal, ngt
"""
cost = self.calNLL(bbox_pred, bbox_gt).mean(-1)
return cost * self.weight
@MATCH_COST.register_module()
class MapQueriesCost(object):
def __init__(self, cls_cost, reg_cost, iou_cost=None):
self.cls_cost = build_match_cost(cls_cost)
self.reg_cost = build_match_cost(reg_cost)
self.iou_cost = None
if iou_cost is not None:
self.iou_cost = build_match_cost(iou_cost)
def __call__(self, preds: dict, gts: dict):
# classification and bboxcost.
cls_cost = self.cls_cost(preds['scores'], gts['labels'])
# regression cost
regkwargs = {}
if 'masks' in preds and 'masks' in gts:
assert isinstance(self.reg_cost, DynamicLinesCost), ' Issues!!'
regkwargs = {
'masks_pred': preds['masks'],
'masks_gt': gts['masks'],
}
reg_cost = self.reg_cost(preds['lines'], gts['lines'], **regkwargs)
# weighted sum of above three costs
cost = cls_cost + reg_cost
# Iou
if self.iou_cost is not None:
iou_cost = self.iou_cost(preds['lines'],gts['lines'])
cost += iou_cost
return cost
import torch
import torch.nn as nn
import torch.nn.functional as F
class NoiseSythesis(nn.Module):
def __init__(self,
p, scale=0.01, shift_scale=(8,5),
scaling_size=(0.1,0.1), canvas_size=(200, 100),
bbox_type='sce',
poly_coord_dim=2,
bbox_coord_dim=2,
quantify=True):
super(NoiseSythesis, self).__init__()
self.p = p
self.scale = scale
self.bbox_type = bbox_type
self.quantify = quantify
self.poly_coord_dim = poly_coord_dim
self.bbox_coord_dim = bbox_coord_dim
self.transforms = [self.random_shifting, self.random_scaling]
# self.transforms = [self.random_scaling]
self.register_buffer('canvas_size', torch.tensor(canvas_size))
self.register_buffer('shift_scale', torch.tensor(shift_scale).float())
self.register_buffer('scaling_size', torch.tensor(scaling_size))
def random_scaling(self, bbox):
'''
bbox: B, paramter_num, 2
'''
device = bbox.device
dtype = bbox.dtype
B = bbox.shape[0]
noise = (torch.rand(B, device=device)*2-1)[:,None,None] # [-1,1]
scale = self.scaling_size.to(device)
scale = (noise * scale) + 1
scaled_bbox = bbox * scale
# recenterization
coffset = scaled_bbox.mean(-2) - bbox.float().mean(-2)
scaled_bbox = scaled_bbox - coffset[:,None]
return scaled_bbox.round().type(dtype)
def random_shifting(self, bbox):
'''
bbox: B, paramter_num, 2
'''
device = bbox.device
batch_size = bbox.shape[0]
shift_scale = self.shift_scale
scale = (bbox.max(1)[0] - bbox.min(1)[0]) * 0.1
scale = torch.where(scale < shift_scale, scale, shift_scale)
noise = (torch.rand(batch_size, 2, device=device)*2-1) # [-1,1]
offset = (noise * scale).round().type(bbox.dtype)
shifted_bbox = bbox + offset[:, None]
return shifted_bbox
def gaussian_noise_bbox(self, bbox):
dtype = bbox.dtype
batch_size = bbox.shape[0]
scale = (self.canvas_size * self.scale)[:self.bbox_coord_dim]
noisy_bbox = torch.normal(bbox.type(torch.float), scale)
if self.quantify:
noisy_bbox = noisy_bbox.round().type(dtype)
# prevent out of bound case
for i in range(self.bbox_coord_dim):
noisy_bbox[...,i] =\
torch.clamp(noisy_bbox[...,0],1,self.canvas_size[i])
else:
noisy_bbox = noisy_bbox.type(torch.float)
return noisy_bbox
def gaussian_noise_poly(self, polyline, polyline_mask):
device = polyline.device
batchsize = polyline.shape[0]
scale = self.canvas_size * self.scale
polyline = F.pad(polyline,(0,self.poly_coord_dim-1))
polyline = polyline.view(batchsize,-1, self.poly_coord_dim)
mask = F.pad(polyline_mask[:,1:],(0,self.poly_coord_dim))
noisy_polyline = torch.normal(polyline.type(torch.float), scale)
if self.quantify:
noisy_polyline = noisy_polyline.round().type(polyline.dtype)
# prevent out of bound case
for i in range(self.poly_coord_dim):
noisy_polyline[...,i] =\
torch.clamp(noisy_polyline[...,i],0,self.canvas_size[i])
else:
noisy_polyline = noisy_polyline.type(torch.float)
noisy_polyline = noisy_polyline.view(batchsize,-1) * mask
noisy_polyline = noisy_polyline[:,:-(self.poly_coord_dim-1)]
return noisy_polyline
def random_apply(self, bbox):
for t in self.transforms:
if self.p < torch.rand(1):
continue
bbox = t(bbox)
# prevent out of bound case
bbox[...,0] =\
torch.clamp(bbox[...,0],0,self.canvas_size[0])
bbox[...,1] =\
torch.clamp(bbox[...,1],0,self.canvas_size[1])
return bbox
def simple_aug(self, batch):
# augment bbox
if self.bbox_type in ['sce', 'xyxy']:
fbbox = batch['bbox_flat']
seq_len = fbbox.shape[0]
bbox = fbbox.view(seq_len, -1, 2)
bbox = self.gaussian_noise_bbox(bbox)
fbbox_aug = bbox.view(seq_len, -1)
aug_mask = torch.rand(fbbox.shape,device=fbbox.device)
fbbox = torch.where(aug_mask<self.p, fbbox_aug, fbbox)
elif self.bbox_type == 'rxyxy':
fbbox = self.rbbox_aug(batch)
elif self.bbox_type == 'convex_hull':
fbbox = self.convex_hull_aug(batch)
# augment
polyline = batch['polylines']
polyline_mask = batch['polyline_masks']
polyline_aug = self.gaussian_noise_poly(polyline, polyline_mask)
aug_mask = torch.rand(polyline.shape,device=polyline.device)
polyline = torch.where(aug_mask<self.p, polyline_aug, polyline)
return polyline, fbbox
def rbbox_aug(self, batch):
return None
def convex_hull_aug(self,batch):
return None
def __call__(self, batch, simple_aug=False):
if simple_aug:
return self.simple_aug(batch)
else:
fbbox = batch['bbox_flat']
seq_len = fbbox.shape[0]
bbox = fbbox.view(seq_len, -1, self.bbox_coord_dim)
aug_bbox = self.random_apply(bbox)
aug_bbox_flat = aug_bbox.view(seq_len, -1)
return aug_bbox_flat
from .ipm_backbone import IPMEncoder
__all__ = [
'IPMEncoder'
]
# --------------------------------------------------------
# InternImage
# Copyright (c) 2022 OpenGVLab
# Licensed under The MIT License [see LICENSE for details]
# --------------------------------------------------------
import torch
import torch.nn as nn
from collections import OrderedDict
import torch.utils.checkpoint as checkpoint
from timm.models.layers import trunc_normal_, DropPath
from mmcv.runner import _load_checkpoint
from mmcv.cnn import constant_init, trunc_normal_init
from mmseg.utils import get_root_logger
from ops_dcnv3 import modules as opsm
import torch.nn.functional as F
from mmdet.models.builder import BACKBONES
class to_channels_first(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
return x.permute(0, 3, 1, 2)
class to_channels_last(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
return x.permute(0, 2, 3, 1)
def build_norm_layer(dim,
norm_layer,
in_format='channels_last',
out_format='channels_last',
eps=1e-6):
layers = []
if norm_layer == 'BN':
if in_format == 'channels_last':
layers.append(to_channels_first())
layers.append(nn.BatchNorm2d(dim))
if out_format == 'channels_last':
layers.append(to_channels_last())
elif norm_layer == 'LN':
if in_format == 'channels_first':
layers.append(to_channels_last())
layers.append(nn.LayerNorm(dim, eps=eps))
if out_format == 'channels_first':
layers.append(to_channels_first())
else:
raise NotImplementedError(
f'build_norm_layer does not support {norm_layer}')
return nn.Sequential(*layers)
def build_act_layer(act_layer):
if act_layer == 'ReLU':
return nn.ReLU(inplace=True)
elif act_layer == 'SiLU':
return nn.SiLU(inplace=True)
elif act_layer == 'GELU':
return nn.GELU()
raise NotImplementedError(f'build_act_layer does not support {act_layer}')
class CrossAttention(nn.Module):
r""" Cross Attention Module
Args:
dim (int): Number of input channels.
num_heads (int): Number of attention heads. Default: 8
qkv_bias (bool, optional): If True, add a learnable bias to q, k, v.
Default: False.
qk_scale (float | None, optional): Override default qk scale of
head_dim ** -0.5 if set. Default: None.
attn_drop (float, optional): Dropout ratio of attention weight.
Default: 0.0
proj_drop (float, optional): Dropout ratio of output. Default: 0.0
attn_head_dim (int, optional): Dimension of attention head.
out_dim (int, optional): Dimension of output.
"""
def __init__(self,
dim,
num_heads=8,
qkv_bias=False,
qk_scale=None,
attn_drop=0.,
proj_drop=0.,
attn_head_dim=None,
out_dim=None):
super().__init__()
if out_dim is None:
out_dim = dim
self.num_heads = num_heads
head_dim = dim // num_heads
if attn_head_dim is not None:
head_dim = attn_head_dim
all_head_dim = head_dim * self.num_heads
self.scale = qk_scale or head_dim ** -0.5
assert all_head_dim == dim
self.q = nn.Linear(dim, all_head_dim, bias=False)
self.k = nn.Linear(dim, all_head_dim, bias=False)
self.v = nn.Linear(dim, all_head_dim, bias=False)
if qkv_bias:
self.q_bias = nn.Parameter(torch.zeros(all_head_dim))
self.k_bias = nn.Parameter(torch.zeros(all_head_dim))
self.v_bias = nn.Parameter(torch.zeros(all_head_dim))
else:
self.q_bias = None
self.k_bias = None
self.v_bias = None
self.attn_drop = nn.Dropout(attn_drop)
self.proj = nn.Linear(all_head_dim, out_dim)
self.proj_drop = nn.Dropout(proj_drop)
def forward(self, x, k=None, v=None):
B, N, C = x.shape
N_k = k.shape[1]
N_v = v.shape[1]
q_bias, k_bias, v_bias = None, None, None
if self.q_bias is not None:
q_bias = self.q_bias
k_bias = self.k_bias
v_bias = self.v_bias
q = F.linear(input=x, weight=self.q.weight, bias=q_bias)
q = q.reshape(B, N, 1, self.num_heads,
-1).permute(2, 0, 3, 1,
4).squeeze(0) # (B, N_head, N_q, dim)
k = F.linear(input=k, weight=self.k.weight, bias=k_bias)
k = k.reshape(B, N_k, 1, self.num_heads, -1).permute(2, 0, 3, 1,
4).squeeze(0)
v = F.linear(input=v, weight=self.v.weight, bias=v_bias)
v = v.reshape(B, N_v, 1, self.num_heads, -1).permute(2, 0, 3, 1,
4).squeeze(0)
q = q * self.scale
attn = (q @ k.transpose(-2, -1)) # (B, N_head, N_q, N_k)
attn = attn.softmax(dim=-1)
attn = self.attn_drop(attn)
x = (attn @ v).transpose(1, 2).reshape(B, N, -1)
x = self.proj(x)
x = self.proj_drop(x)
return x
class AttentiveBlock(nn.Module):
r"""Attentive Block
Args:
dim (int): Number of input channels.
num_heads (int): Number of attention heads. Default: 8
qkv_bias (bool, optional): If True, add a learnable bias to q, k, v.
Default: False.
qk_scale (float | None, optional): Override default qk scale of
head_dim ** -0.5 if set. Default: None.
drop (float, optional): Dropout rate. Default: 0.0.
attn_drop (float, optional): Attention dropout rate. Default: 0.0.
drop_path (float | tuple[float], optional): Stochastic depth rate.
Default: 0.0.
norm_layer (nn.Module, optional): Normalization layer. Default: nn.LayerNorm.
attn_head_dim (int, optional): Dimension of attention head. Default: None.
out_dim (int, optional): Dimension of output. Default: None.
"""
def __init__(self,
dim,
num_heads,
qkv_bias=False,
qk_scale=None,
drop=0.,
attn_drop=0.,
drop_path=0.,
norm_layer="LN",
attn_head_dim=None,
out_dim=None):
super().__init__()
self.norm1_q = build_norm_layer(dim, norm_layer, eps=1e-6)
self.norm1_k = build_norm_layer(dim, norm_layer, eps=1e-6)
self.norm1_v = build_norm_layer(dim, norm_layer, eps=1e-6)
self.cross_dcn = CrossAttention(dim,
num_heads=num_heads,
qkv_bias=qkv_bias,
qk_scale=qk_scale,
attn_drop=attn_drop,
proj_drop=drop,
attn_head_dim=attn_head_dim,
out_dim=out_dim)
self.drop_path = DropPath(
drop_path) if drop_path > 0. else nn.Identity()
def forward(self,
x_q,
x_kv,
pos_q,
pos_k,
bool_masked_pos,
rel_pos_bias=None):
x_q = self.norm1_q(x_q + pos_q)
x_k = self.norm1_k(x_kv + pos_k)
x_v = self.norm1_v(x_kv)
x = self.cross_dcn(x_q, k=x_k, v=x_v)
return x
class AttentionPoolingBlock(AttentiveBlock):
def forward(self, x):
x_q = x.mean(1, keepdim=True)
x_kv = x
pos_q, pos_k = 0, 0
x = super().forward(x_q, x_kv, pos_q, pos_k,
bool_masked_pos=None,
rel_pos_bias=None)
x = x.squeeze(1)
return x
class StemLayer(nn.Module):
r""" Stem layer of InternImage
Args:
in_chans (int): number of input channels
out_chans (int): number of output channels
act_layer (str): activation layer
norm_layer (str): normalization layer
"""
def __init__(self,
in_chans=3,
out_chans=96,
act_layer='GELU',
norm_layer='BN'):
super().__init__()
self.conv1 = nn.Conv2d(in_chans,
out_chans // 2,
kernel_size=3,
stride=2,
padding=1)
self.norm1 = build_norm_layer(out_chans // 2, norm_layer,
'channels_first', 'channels_first')
self.act = build_act_layer(act_layer)
self.conv2 = nn.Conv2d(out_chans // 2,
out_chans,
kernel_size=3,
stride=2,
padding=1)
self.norm2 = build_norm_layer(out_chans, norm_layer, 'channels_first',
'channels_last')
def forward(self, x):
x = self.conv1(x)
x = self.norm1(x)
x = self.act(x)
x = self.conv2(x)
x = self.norm2(x)
return x
class DownsampleLayer(nn.Module):
r""" Downsample layer of InternImage
Args:
channels (int): number of input channels
norm_layer (str): normalization layer
"""
def __init__(self, channels, norm_layer='LN'):
super().__init__()
self.conv = nn.Conv2d(channels,
2 * channels,
kernel_size=3,
stride=2,
padding=1,
bias=False)
self.norm = build_norm_layer(2 * channels, norm_layer,
'channels_first', 'channels_last')
def forward(self, x):
x = self.conv(x.permute(0, 3, 1, 2))
x = self.norm(x)
return x
class MLPLayer(nn.Module):
r""" MLP layer of InternImage
Args:
in_features (int): number of input features
hidden_features (int): number of hidden features
out_features (int): number of output features
act_layer (str): activation layer
drop (float): dropout rate
"""
def __init__(self,
in_features,
hidden_features=None,
out_features=None,
act_layer='GELU',
drop=0.):
super().__init__()
out_features = out_features or in_features
hidden_features = hidden_features or in_features
self.fc1 = nn.Linear(in_features, hidden_features)
self.act = build_act_layer(act_layer)
self.fc2 = nn.Linear(hidden_features, out_features)
self.drop = nn.Dropout(drop)
def forward(self, x):
x = self.fc1(x)
x = self.act(x)
x = self.drop(x)
x = self.fc2(x)
x = self.drop(x)
return x
class InternImageLayer(nn.Module):
r""" Basic layer of InternImage
Args:
core_op (nn.Module): core operation of InternImage
channels (int): number of input channels
groups (list): Groups of each block.
mlp_ratio (float): ratio of mlp hidden features to input channels
drop (float): dropout rate
drop_path (float): drop path rate
act_layer (str): activation layer
norm_layer (str): normalization layer
post_norm (bool): whether to use post normalization
layer_scale (float): layer scale
offset_scale (float): offset scale
with_cp (bool): whether to use checkpoint
"""
def __init__(self,
core_op,
channels,
groups,
mlp_ratio=4.,
drop=0.,
drop_path=0.,
act_layer='GELU',
norm_layer='LN',
post_norm=False,
layer_scale=None,
offset_scale=1.0,
with_cp=False,
dw_kernel_size=None, # for InternImage-H/G
res_post_norm=False, # for InternImage-H/G
center_feature_scale=False): # for InternImage-H/G
super().__init__()
self.channels = channels
self.groups = groups
self.mlp_ratio = mlp_ratio
self.with_cp = with_cp
self.norm1 = build_norm_layer(channels, 'LN')
self.post_norm = post_norm
self.dcn = core_op(
channels=channels,
kernel_size=3,
stride=1,
pad=1,
dilation=1,
group=groups,
offset_scale=offset_scale,
act_layer=act_layer,
norm_layer=norm_layer,
dw_kernel_size=dw_kernel_size, # for InternImage-H/G
center_feature_scale=center_feature_scale) # for InternImage-H/G
self.drop_path = DropPath(drop_path) if drop_path > 0. \
else nn.Identity()
self.norm2 = build_norm_layer(channels, 'LN')
self.mlp = MLPLayer(in_features=channels,
hidden_features=int(channels * mlp_ratio),
act_layer=act_layer,
drop=drop)
self.layer_scale = layer_scale is not None
if self.layer_scale:
self.gamma1 = nn.Parameter(layer_scale * torch.ones(channels),
requires_grad=True)
self.gamma2 = nn.Parameter(layer_scale * torch.ones(channels),
requires_grad=True)
self.res_post_norm = res_post_norm
if res_post_norm:
self.res_post_norm1 = build_norm_layer(channels, 'LN')
self.res_post_norm2 = build_norm_layer(channels, 'LN')
def forward(self, x):
def _inner_forward(x):
if not self.layer_scale:
if self.post_norm:
x = x + self.drop_path(self.norm1(self.dcn(x)))
x = x + self.drop_path(self.norm2(self.mlp(x)))
elif self.res_post_norm: # for InternImage-H/G
x = x + self.drop_path(self.res_post_norm1(self.dcn(self.norm1(x))))
x = x + self.drop_path(self.res_post_norm2(self.mlp(self.norm2(x))))
else:
x = x + self.drop_path(self.dcn(self.norm1(x)))
x = x + self.drop_path(self.mlp(self.norm2(x)))
return x
if self.post_norm:
x = x + self.drop_path(self.gamma1 * self.norm1(self.dcn(x)))
x = x + self.drop_path(self.gamma2 * self.norm2(self.mlp(x)))
else:
x = x + self.drop_path(self.gamma1 * self.dcn(self.norm1(x)))
x = x + self.drop_path(self.gamma2 * self.mlp(self.norm2(x)))
return x
if self.with_cp and x.requires_grad:
x = checkpoint.checkpoint(_inner_forward, x)
else:
x = _inner_forward(x)
return x
class InternImageBlock(nn.Module):
r""" Block of InternImage
Args:
core_op (nn.Module): core operation of InternImage
channels (int): number of input channels
depths (list): Depth of each block.
groups (list): Groups of each block.
mlp_ratio (float): ratio of mlp hidden features to input channels
drop (float): dropout rate
drop_path (float): drop path rate
act_layer (str): activation layer
norm_layer (str): normalization layer
post_norm (bool): whether to use post normalization
layer_scale (float): layer scale
offset_scale (float): offset scale
with_cp (bool): whether to use checkpoint
"""
def __init__(self,
core_op,
channels,
depth,
groups,
downsample=True,
mlp_ratio=4.,
drop=0.,
drop_path=0.,
act_layer='GELU',
norm_layer='LN',
post_norm=False,
offset_scale=1.0,
layer_scale=None,
with_cp=False,
dw_kernel_size=None, # for InternImage-H/G
post_norm_block_ids=None, # for InternImage-H/G
res_post_norm=False, # for InternImage-H/G
center_feature_scale=False): # for InternImage-H/G
super().__init__()
self.channels = channels
self.depth = depth
self.post_norm = post_norm
self.center_feature_scale = center_feature_scale
self.blocks = nn.ModuleList([
InternImageLayer(
core_op=core_op,
channels=channels,
groups=groups,
mlp_ratio=mlp_ratio,
drop=drop,
drop_path=drop_path[i] if isinstance(
drop_path, list) else drop_path,
act_layer=act_layer,
norm_layer=norm_layer,
post_norm=post_norm,
layer_scale=layer_scale,
offset_scale=offset_scale,
with_cp=with_cp,
dw_kernel_size=dw_kernel_size, # for InternImage-H/G
res_post_norm=res_post_norm, # for InternImage-H/G
center_feature_scale=center_feature_scale # for InternImage-H/G
) for i in range(depth)
])
if not self.post_norm or center_feature_scale:
self.norm = build_norm_layer(channels, 'LN')
self.post_norm_block_ids = post_norm_block_ids
if post_norm_block_ids is not None: # for InternImage-H/G
self.post_norms = nn.ModuleList(
[build_norm_layer(channels, 'LN', eps=1e-6) for _ in post_norm_block_ids]
)
self.downsample = DownsampleLayer(
channels=channels, norm_layer=norm_layer) if downsample else None
def forward(self, x, return_wo_downsample=False):
for i, blk in enumerate(self.blocks):
x = blk(x)
if (self.post_norm_block_ids is not None) and (i in self.post_norm_block_ids):
index = self.post_norm_block_ids.index(i)
x = self.post_norms[index](x) # for InternImage-H/G
if not self.post_norm or self.center_feature_scale:
x = self.norm(x)
if return_wo_downsample:
x_ = x
if self.downsample is not None:
x = self.downsample(x)
if return_wo_downsample:
return x, x_
return x
@BACKBONES.register_module()
class InternImage(nn.Module):
r""" InternImage
A PyTorch impl of : `InternImage: Exploring Large-Scale Vision Foundation Models with Deformable Convolutions` -
https://arxiv.org/pdf/2103.14030
Args:
core_op (str): Core operator. Default: 'DCNv3'
channels (int): Number of the first stage. Default: 64
depths (list): Depth of each block. Default: [3, 4, 18, 5]
groups (list): Groups of each block. Default: [3, 6, 12, 24]
mlp_ratio (float): Ratio of mlp hidden dim to embedding dim. Default: 4.
drop_rate (float): Probability of an element to be zeroed. Default: 0.
drop_path_rate (float): Stochastic depth rate. Default: 0.
act_layer (str): Activation layer. Default: 'GELU'
norm_layer (str): Normalization layer. Default: 'LN'
layer_scale (bool): Whether to use layer scale. Default: False
cls_scale (bool): Whether to use class scale. Default: False
with_cp (bool): Use checkpoint or not. Using checkpoint will save some
dw_kernel_size (int): Size of the dwconv. Default: None
level2_post_norm (bool): Whether to use level2 post norm. Default: False
level2_post_norm_block_ids (list): Indexes of post norm blocks. Default: None
res_post_norm (bool): Whether to use res post norm. Default: False
center_feature_scale (bool): Whether to use center feature scale. Default: False
"""
def __init__(self,
core_op='DCNv3',
channels=64,
depths=[3, 4, 18, 5],
groups=[3, 6, 12, 24],
mlp_ratio=4.,
drop_rate=0.,
drop_path_rate=0.2,
drop_path_type='linear',
act_layer='GELU',
norm_layer='LN',
layer_scale=None,
offset_scale=1.0,
post_norm=False,
with_cp=False,
dw_kernel_size=None, # for InternImage-H/G
level2_post_norm=False, # for InternImage-H/G
level2_post_norm_block_ids=None, # for InternImage-H/G
res_post_norm=False, # for InternImage-H/G
center_feature_scale=False, # for InternImage-H/G
out_indices=(0, 1, 2, 3),
init_cfg=None,
**kwargs):
super().__init__()
self.core_op = core_op
self.num_levels = len(depths)
self.depths = depths
self.channels = channels
self.num_features = int(channels * 2**(self.num_levels - 1))
self.post_norm = post_norm
self.mlp_ratio = mlp_ratio
self.init_cfg = init_cfg
self.out_indices = out_indices
self.level2_post_norm_block_ids = level2_post_norm_block_ids
# logger = get_root_logger()
# logger.info(f'using core type: {core_op}')
# logger.info(f'using activation layer: {act_layer}')
# logger.info(f'using main norm layer: {norm_layer}')
# logger.info(f'using dpr: {drop_path_type}, {drop_path_rate}')
# logger.info(f"level2_post_norm: {level2_post_norm}")
# logger.info(f"level2_post_norm_block_ids: {level2_post_norm_block_ids}")
# logger.info(f"res_post_norm: {res_post_norm}")
in_chans = 3
self.patch_embed = StemLayer(in_chans=in_chans,
out_chans=channels,
act_layer=act_layer,
norm_layer=norm_layer)
self.pos_drop = nn.Dropout(p=drop_rate)
dpr = [
x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))
]
if drop_path_type == 'uniform':
for i in range(len(dpr)):
dpr[i] = drop_path_rate
self.levels = nn.ModuleList()
for i in range(self.num_levels):
post_norm_block_ids = level2_post_norm_block_ids if level2_post_norm and (
i == 2) else None # for InternImage-H/G
level = InternImageBlock(
core_op=getattr(opsm, core_op),
channels=int(channels * 2**i),
depth=depths[i],
groups=groups[i],
mlp_ratio=self.mlp_ratio,
drop=drop_rate,
drop_path=dpr[sum(depths[:i]):sum(depths[:i + 1])],
act_layer=act_layer,
norm_layer=norm_layer,
post_norm=post_norm,
downsample=(i < self.num_levels - 1),
layer_scale=layer_scale,
offset_scale=offset_scale,
with_cp=with_cp,
dw_kernel_size=dw_kernel_size, # for InternImage-H/G
post_norm_block_ids=post_norm_block_ids, # for InternImage-H/G
res_post_norm=res_post_norm, # for InternImage-H/G
center_feature_scale=center_feature_scale # for InternImage-H/G
)
self.levels.append(level)
self.num_layers = len(depths)
self.apply(self._init_weights)
self.apply(self._init_deform_weights)
def init_weights(self):
logger = get_root_logger()
if self.init_cfg is None:
logger.warn(f'No pre-trained weights for '
f'{self.__class__.__name__}, '
f'training start from scratch')
for m in self.modules():
if isinstance(m, nn.Linear):
trunc_normal_init(m, std=.02, bias=0.)
elif isinstance(m, nn.LayerNorm):
constant_init(m, 1.0)
else:
assert 'checkpoint' in self.init_cfg, f'Only support ' \
f'specify `Pretrained` in ' \
f'`init_cfg` in ' \
f'{self.__class__.__name__} '
ckpt = _load_checkpoint(self.init_cfg.checkpoint,
logger=logger,
map_location='cpu')
if 'state_dict' in ckpt:
_state_dict = ckpt['state_dict']
elif 'model' in ckpt:
_state_dict = ckpt['model']
else:
_state_dict = ckpt
state_dict = OrderedDict()
for k, v in _state_dict.items():
if k.startswith('backbone.'):
state_dict[k[9:]] = v
else:
state_dict[k] = v
# strip prefix of state_dict
if list(state_dict.keys())[0].startswith('module.'):
state_dict = {k[7:]: v for k, v in state_dict.items()}
# load state_dict
meg = self.load_state_dict(state_dict, False)
logger.info(meg)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
trunc_normal_(m.weight, std=.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
def _init_deform_weights(self, m):
if isinstance(m, getattr(opsm, self.core_op)):
m._reset_parameters()
def forward(self, x):
x = self.patch_embed(x)
x = self.pos_drop(x)
seq_out = []
for level_idx, level in enumerate(self.levels):
x, x_ = level(x, return_wo_downsample=True)
if level_idx in self.out_indices:
seq_out.append(x_.permute(0, 3, 1, 2).contiguous())
return seq_out
\ No newline at end of file
import copy
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from mmdet3d.models.builder import BACKBONES
from mmdet.models import build_backbone, build_neck
class UpsampleBlock(nn.Module):
def __init__(self, ins, outs):
super(UpsampleBlock, self).__init__()
self.gn = nn.GroupNorm(32, outs)
self.conv = nn.Conv2d(ins, outs, kernel_size=3,
stride=1, padding=1) # same
self.relu = nn.ReLU(inplace=True)
def forward(self, x):
x = self.conv(x)
x = self.relu(self.gn(x))
x = self.upsample2x(x)
return x
def upsample2x(self, x):
_, _, h, w = x.shape
x = F.interpolate(x, size=(h*2, w*2),
mode='bilinear', align_corners=True)
return x
class Upsample(nn.Module):
def __init__(self,
zoom_size=(2, 4, 8),
in_channels=128,
out_channels=128,
):
super(Upsample, self).__init__()
self.out_channels = out_channels
input_conv = UpsampleBlock(in_channels, out_channels)
inter_conv = UpsampleBlock(out_channels, out_channels)
fscale = []
for scale_factor in zoom_size:
layer_num = int(math.log2(scale_factor))
if layer_num < 1:
fscale.append(nn.Identity())
continue
tmp = [copy.deepcopy(input_conv), ]
tmp += [copy.deepcopy(inter_conv) for i in range(layer_num-1)]
fscale.append(nn.Sequential(*tmp))
self.fscale = nn.ModuleList(fscale)
def init_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_uniform_(m.weight, a=1)
nn.init.constant_(m.bias, 0)
def forward(self, imgs):
rescale_i = []
for f, img in zip(self.fscale, imgs):
rescale_i.append(f(img))
out = sum(rescale_i)
return out
@BACKBONES.register_module()
class IPMEncoder(nn.Module):
'''
encode cam features
'''
def __init__(self,
img_backbone,
img_neck,
upsample,
xbound=[-30.0, 30.0, 0.5],
ybound=[-15.0, 15.0, 0.5],
zbound=[-10.0, 10.0, 20.0],
heights=[-1.1, 0, 0.5, 1.1],
pretrained=None,
out_channels=128,
num_cam=6,
use_lidar=False,
use_image=True,
lidar_dim=128,
):
super(IPMEncoder, self).__init__()
self.x_bound = xbound
self.y_bound = ybound
self.heights = heights
self.num_cam = num_cam
num_x = int((xbound[1] - xbound[0]) / xbound[2])
num_y = int((ybound[1] - ybound[0]) / ybound[2])
self.img_backbone = build_backbone(img_backbone)
self.img_neck = build_neck(img_neck)
self.upsample = Upsample(**upsample)
self.use_image = use_image
self.use_lidar = use_lidar
if self.use_lidar:
self.pp = PointPillarEncoder(lidar_dim, xbound, ybound, zbound)
self.outconvs =\
nn.Conv2d((self.upsample.out_channels+3)*len(heights), out_channels//2,
kernel_size=3, stride=1, padding=1) # same
if self.use_image:
_out_channels = out_channels//2
else:
_out_channels = out_channels
self.outconvs_lidar =\
nn.Conv2d(lidar_dim, _out_channels,
kernel_size=3, stride=1, padding=1) # same
else:
self.outconvs =\
nn.Conv2d((self.upsample.out_channels+3)*len(heights), out_channels,
kernel_size=3, stride=1, padding=1) # same
self.init_weights(pretrained=pretrained)
# bev_plane
bev_planes = [construct_plane_grid(
xbound, ybound, h) for h in self.heights]
self.register_buffer('bev_planes', torch.stack(
bev_planes),) # nlvl,bH,bW,2
self.masked_embeds = nn.Embedding(len(heights), out_channels)
def init_weights(self, pretrained=None):
"""Initialize model weights."""
self.img_backbone.init_weights()
self.img_neck.init_weights()
self.upsample.init_weights()
for p in self.outconvs.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
if self.use_lidar:
for p in self.outconvs_lidar.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
for p in self.pp.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
def extract_img_feat(self, imgs):
'''
Extract image feaftures and sum up into one pic
Args:
imgs: B, n_cam, C, iH, iW
Returns:
img_feat: B * n_cam, C, H, W
'''
B, n_cam, C, iH, iW = imgs.shape
imgs = imgs.view(B * n_cam, C, iH, iW)
img_feats = self.img_backbone(imgs)
# reduce the channel dim
img_feats = self.img_neck(img_feats)
# fuse four feature map
img_feat = self.upsample(img_feats)
return img_feat
def forward(self, imgs, img_metas, *args, points=None, **kwargs):
'''
Args:
imgs: torch.Tensor of shape [B, N, 3, H, W]
N: number of cams
img_metas:
# N=6, ['CAM_FRONT', 'CAM_FRONT_RIGHT', 'CAM_FRONT_LEFT', 'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT']
ego2cam: [B, N, 4, 4]
cam_intrinsics: [B, N, 3, 3]
cam2ego_rotations: [B, N, 3, 3]
cam2ego_translations: [B, N, 3]
...
Outs:
bev_feature: torch.Tensor of shape [B, C*nlvl, bH, bW]
'''
if self.use_image:
self.B = imgs.shape[0]
# Get transform matrix
ego2cam = []
for img_meta in img_metas:
ego2cam.append(img_meta['ego2img'])
img_shape = imgs.shape[-2:]
ego2cam = np.asarray(ego2cam)
# Image backbone
img_feats = self.extract_img_feat(imgs)
# IPM
bev_feat, bev_feat_mask = self.ipm(img_feats, ego2cam, img_shape)
# multi level into a same
bev_feat = bev_feat.flatten(1, 2)
bev_feat = self.outconvs(bev_feat)
if self.use_lidar:
lidar_feat = self.get_lidar_feature(points)
if self.use_image:
bev_feat = torch.cat([bev_feat,lidar_feat],dim=1)
else:
bev_feat = lidar_feat
return bev_feat
def ipm(self, cam_feat, ego2cam, img_shape):
'''
inverse project
Args:
cam_feat: B*ncam, C, cH, cW
img_shape: tuple(H, W)
Returns:
project_feat: B, C, nlvl, bH, bW
bev_feat_mask: B, 1, nlvl, bH, bW
'''
C = cam_feat.shape[1]
bev_grid = self.bev_planes.unsqueeze(0).repeat(self.B, 1, 1, 1, 1)
nlvl, bH, bW = bev_grid.shape[1:4]
bev_grid = bev_grid.flatten(1, 3) # B, nlvl*W*H, 3
# Find points in cam coords
# bev_grid_pos: B*ncam, nlvl*bH*bW, 2
bev_grid_pos, bev_cam_mask = get_campos(bev_grid, ego2cam, img_shape)
# B*cam, nlvl*bH, bW, 2
bev_grid_pos = bev_grid_pos.unflatten(-2, (nlvl*bH, bW))
# project feat from 2D to bev plane
projected_feature = F.grid_sample(
cam_feat, bev_grid_pos, align_corners=False).view(self.B, -1, C, nlvl, bH, bW) # B,cam,C,nlvl,bH,bW
# B,cam,nlvl,bH,bW
bev_feat_mask = bev_cam_mask.unflatten(-1, (nlvl, bH, bW))
# eliminate the ncam
# The bev feature is the sum of the 6 cameras
bev_feat_mask = bev_feat_mask.unsqueeze(2)
projected_feature = (projected_feature*bev_feat_mask).sum(1)
num_feat = bev_feat_mask.sum(1)
projected_feature = projected_feature / \
num_feat.masked_fill(num_feat == 0, 1)
# concatenate a position information
# projected_feature: B, bH, bW, nlvl, C+3
bev_grid = bev_grid.view(self.B, nlvl, bH, bW,
3).permute(0, 4, 1, 2, 3)
projected_feature = torch.cat(
(projected_feature, bev_grid), dim=1)
return projected_feature, bev_feat_mask.sum(1) > 0
def get_lidar_feature(self, points):
ptensor, pmask = points
lidar_feature = self.pp(ptensor, pmask)
# bev_grid = self.bev_planes[...,:-1].unsqueeze(0).repeat(self.B, 1, 1, 1, 1)
# bev_grid = bev_grid[:,0]
# bev_grid = bev_grid.permute(0, 3, 1, 2)
# lidar_feature = torch.cat(
# (lidar_feature, bev_grid), dim=1)
lidar_feature = self.outconvs_lidar(lidar_feature)
return lidar_feature
def construct_plane_grid(xbound, ybound, height: float, dtype=torch.float32):
'''
Returns:
plane: H, W, 3
'''
xmin, xmax = xbound[0], xbound[1]
num_x = int((xbound[1] - xbound[0]) / xbound[2])
ymin, ymax = ybound[0], ybound[1]
num_y = int((ybound[1] - ybound[0]) / ybound[2])
x = torch.linspace(xmin, xmax, num_x, dtype=dtype)
y = torch.linspace(ymin, ymax, num_y, dtype=dtype)
# [num_y, num_x]
y, x = torch.meshgrid(y, x)
z = torch.ones_like(x) * height
# [num_y, num_x, 3]
plane = torch.stack([x, y, z], dim=-1)
return plane
def get_campos(reference_points, ego2cam, img_shape):
'''
Find the each refence point's corresponding pixel in each camera
Args:
reference_points: [B, num_query, 3]
ego2cam: (B, num_cam, 4, 4)
Outs:
reference_points_cam: (B*num_cam, num_query, 2)
mask: (B, num_cam, num_query)
num_query == W*H
'''
ego2cam = reference_points.new_tensor(ego2cam) # (B, N, 4, 4)
reference_points = reference_points.clone()
B, num_query = reference_points.shape[:2]
num_cam = ego2cam.shape[1]
# reference_points (B, num_queries, 4)
reference_points = torch.cat(
(reference_points, torch.ones_like(reference_points[..., :1])), -1)
reference_points = reference_points.view(
B, 1, num_query, 4).repeat(1, num_cam, 1, 1).unsqueeze(-1)
ego2cam = ego2cam.view(
B, num_cam, 1, 4, 4).repeat(1, 1, num_query, 1, 1)
# reference_points_cam (B, num_cam, num_queries, 4)
reference_points_cam = (ego2cam @ reference_points).squeeze(-1)
eps = 1e-9
mask = (reference_points_cam[..., 2:3] > eps)
reference_points_cam =\
reference_points_cam[..., 0:2] / \
reference_points_cam[..., 2:3] + eps
reference_points_cam[..., 0] /= img_shape[1]
reference_points_cam[..., 1] /= img_shape[0]
# from 0~1 to -1~1
reference_points_cam = (reference_points_cam - 0.5) * 2
mask = (mask & (reference_points_cam[..., 0:1] > -1.0)
& (reference_points_cam[..., 0:1] < 1.0)
& (reference_points_cam[..., 1:2] > -1.0)
& (reference_points_cam[..., 1:2] < 1.0))
# (B, num_cam, num_query)
mask = mask.view(B, num_cam, num_query)
reference_points_cam = reference_points_cam.view(B*num_cam, num_query, 2)
return reference_points_cam, mask
def _test():
pass
if __name__ == '__main__':
_test()
from .base_map_head import BaseMapHead
from .dg_head import DGHead
from .map_element_detector import MapElementDetector
from .polyline_generator import PolylineGenerator
\ No newline at end of file
from abc import ABCMeta, abstractmethod
import torch.nn as nn
from mmcv.runner import auto_fp16
from mmcv.utils import print_log
from mmdet.utils import get_root_logger
class BaseMapHead(nn.Module, metaclass=ABCMeta):
"""Base class for mappers."""
def __init__(self):
super(BaseMapHead, self).__init__()
self.fp16_enabled = False
def init_weights(self, pretrained=None):
"""Initialize the weights in detector.
Args:
pretrained (str, optional): Path to pre-trained weights.
Defaults to None.
"""
if pretrained is not None:
logger = get_root_logger()
print_log(f'load model from: {pretrained}', logger=logger)
@auto_fp16(apply_to=('img', ))
def forward(self, *args, **kwargs):
pass
@abstractmethod
def loss(self, pred, gt):
'''
Compute loss
Output:
dict(
loss: torch.Tensor
log_vars: dict(
str: float,
)
num_samples: int
)
'''
return
@abstractmethod
def post_process(self, pred):
'''
convert model predictions to vectorized outputs
the output format should be consistent with the evaluation function
'''
return
# the causal layer is credited by the https://github.com/alexmt-scale/causal-transformer-decoder
# we made some change to stick with the polygen.
import torch
import torch.nn as nn
from typing import Optional
from torch import Tensor
from mmcv.cnn.bricks.registry import ATTENTION
from mmcv.utils import build_from_cfg
def build_attention(cfg, default_args=None):
"""Builder for attention."""
return build_from_cfg(cfg, ATTENTION, default_args)
class CausalTransformerDecoder(nn.TransformerDecoder):
"""Implementation of a transformer decoder based on torch implementation but
more efficient. The difference is that it doesn't need to recompute the
embeddings of all the past decoded tokens but instead uses a cache to
store them. This makes use of the fact that the attention of a decoder is
causal, so new predicted tokens don't affect the old tokens' embedding bc
the corresponding attention cells are masked.
The complexity goes from seq_len^3 to seq_len^2.
This only happens in eval mode.
In training mode, teacher forcing makes these optimizations unnecessary. Hence the
Decoder acts like a regular nn.TransformerDecoder (except that the attention tgt
masks are handled for you).
"""
def forward(
self,
tgt: Tensor,
memory: Optional[Tensor] = None,
cache: Optional[Tensor] = None,
memory_mask: Optional[Tensor] = None,
tgt_key_padding_mask: Optional[Tensor] = None,
memory_key_padding_mask: Optional[Tensor] = None,
causal_mask: Optional[Tensor] = None,
) -> Tensor:
"""
Args:
tgt (Tensor): current_len_output x bsz x hidden_dim
memory (Tensor): len_encoded_seq x bsz x hidden_dim
cache (Optional[Tensor]):
n_layers x (current_len_output - 1) x bsz x hidden_dim
If current_len_output == 1, nothing is cached yet, so cache
should be None. Same if the module is in training mode.
others (Optional[Tensor]): see official documentations
Returns:
output (Tensor): current_len_output x bsz x hidden_dim
cache (Optional[Tensor]): n_layers x current_len_output x bsz x hidden_dim
Only returns it when module is in eval mode (no caching in training)
"""
output = tgt
if self.training:
if cache is not None:
raise ValueError(
"cache parameter should be None in training mode")
for mod in self.layers:
output = mod(
output,
memory,
memory_mask=memory_mask,
tgt_key_padding_mask=tgt_key_padding_mask,
memory_key_padding_mask=memory_key_padding_mask,
causal_mask=causal_mask,
only_last=False,
)
return output, cache
else:
new_token_cache = []
for i, mod in enumerate(self.layers):
output = mod(output, memory,
memory_mask=memory_mask,
tgt_key_padding_mask=tgt_key_padding_mask,
memory_key_padding_mask=memory_key_padding_mask,
causal_mask=causal_mask,
only_last=True if cache is not None else False)
new_token_cache.append(output)
# use the pre_calculated intermediate parameters.
if cache is not None:
output = torch.cat([cache[i], output], dim=0)
if cache is not None:
new_cache = torch.cat(
[cache, torch.stack(new_token_cache, dim=0)], dim=1)
else:
new_cache = torch.stack(new_token_cache, dim=0)
return output, new_cache
class CausalTransformerDecoderLayer(nn.TransformerDecoderLayer):
def __init__(self, *args, re_zero=True, norm_first=True, map_attn_cfg=None, **kwargs):
'''
Args:
re_zero: If True, alpha scale residuals with zero init.
'''
super(CausalTransformerDecoderLayer, self).__init__(*args, **kwargs)
if re_zero:
self.res_weight1 = nn.Parameter(torch.FloatTensor([0, ]))
self.res_weight2 = nn.Parameter(torch.FloatTensor([0, ]))
self.res_weight3 = nn.Parameter(torch.FloatTensor([0, ]))
else:
self.res_weight1 = 1.
self.res_weight2 = 1.
self.res_weight3 = 1.
self.norm_first = norm_first
self.map_attn = None
if map_attn_cfg is not None:
self.map_attn = build_attention(map_attn_cfg)
def forward(
self,
tgt: Tensor,
memory: Optional[Tensor] = None,
memory_mask: Optional[Tensor] = None,
tgt_key_padding_mask: Optional[Tensor] = None,
memory_key_padding_mask: Optional[Tensor] = None,
causal_mask: Optional[Tensor] = None,
query: Optional[Tensor] = None,
only_last=False) -> Tensor:
"""
Args:
see CausalTransformerDecoder
query is not None model will perform query stream
Returns:
Tensor:
If training: embedding of the whole layer: seq_len x bsz x hidden_dim
If eval mode: embedding of last token: 1 x bsz x hidden_dim
"""
if not self.norm_first:
raise ValueError(
"norm_first parameter should be True!")
if self.training:
# the official Pytorch implementation
x = tgt
if query is not None:
x = query
x = x + self.res_weight1 * \
self._sa_block(self.norm1(x), self.norm1(tgt), causal_mask,
tgt_key_padding_mask)
if memory is not None:
x = x + self.res_weight2 * \
self._mha_block(self.norm2(x), memory,
memory_mask, memory_key_padding_mask)
x = x + self.res_weight3*self._ff_block(self.norm3(x))
return x
# This part is adapted from the official Pytorch implementation
# So that only the last token gets modified and returned.
# we follow the pre-LN trans in https://arxiv.org/pdf/2002.04745v1.pdf .
x = tgt
if query is not None:
x = query
if only_last:
x = x[-1:]
if causal_mask is not None:
attn_mask = causal_mask
if only_last:
attn_mask = attn_mask[-1:] # XXX
else:
attn_mask = None
# efficient self attention
x = x + self.res_weight1 * \
self._sa_block(self.norm1(x), self.norm1(tgt), attn_mask,
tgt_key_padding_mask)
# encoder-decoder attention
if memory is not None:
x = x + self.res_weight2 * \
self._mha_block(self.norm2(x), memory,
memory_mask, memory_key_padding_mask)
# final feed-forward network
x = x + self.res_weight3*self._ff_block(self.norm3(x))
return x
# self-attention block
def _sa_block(self, x: Tensor, mem: Tensor,
attn_mask: Optional[Tensor], key_padding_mask: Optional[Tensor]) -> Tensor:
x = self.self_attn(x, mem, mem,
attn_mask=attn_mask,
key_padding_mask=key_padding_mask,
need_weights=False)[0]
return self.dropout1(x)
# multihead attention block
def _mha_block(self, x: Tensor, mem: Tensor,
attn_mask: Optional[Tensor], key_padding_mask: Optional[Tensor]) -> Tensor:
x = self.multihead_attn(x, mem, mem,
attn_mask=attn_mask,
key_padding_mask=key_padding_mask,
need_weights=False)[0]
return self.dropout2(x)
# feed forward block
def _ff_block(self, x: Tensor) -> Tensor:
x = self.linear2(self.dropout(self.activation(self.linear1(x))))
return self.dropout3(x)
class PolygenTransformerEncoderLayer(nn.TransformerEncoderLayer):
def __init__(self, *args, re_zero=True, norm_first=True, **kwargs):
'''
Args:
re_zero: If True, alpha scale residuals with zero init.
'''
super(PolygenTransformerEncoderLayer, self).__init__(*args, **kwargs)
if re_zero:
self.res_weight1 = nn.Parameter(torch.FloatTensor([0, ]))
self.res_weight2 = nn.Parameter(torch.FloatTensor([0, ]))
else:
self.res_weight1 = 1.
self.res_weight2 = 1.
self.norm_first = norm_first
def forward(self, src: Tensor, src_mask: Optional[Tensor] = None, src_key_padding_mask: Optional[Tensor] = None) -> Tensor:
r"""Pass the input through the encoder layer.
Args:
src: the sequence to the encoder layer (required).
src_mask: the mask for the src sequence (optional).
src_key_padding_mask: the mask for the src keys per batch (optional).
Shape:
see the docs in Transformer class.
"""
# see Fig. 1 of https://arxiv.org/pdf/2002.04745v1.pdf
x = src
if self.norm_first:
x = x + self.res_weight1*self._sa_block(self.norm1(x), src_mask,
src_key_padding_mask)
x = x + self.res_weight2*self._ff_block(self.norm2(x))
else:
x = self.norm1(
x + self.res_weight1*self._sa_block(x, src_mask, src_key_padding_mask))
x = self.norm2(x + self.res_weight2*self._ff_block(x))
return x
# self-attention block
def _sa_block(self, x: Tensor,
attn_mask: Optional[Tensor], key_padding_mask: Optional[Tensor]) -> Tensor:
x = self.self_attn(x, x, x,
attn_mask=attn_mask,
key_padding_mask=key_padding_mask,
need_weights=False)[0]
return self.dropout1(x)
# feed forward block
def _ff_block(self, x: Tensor) -> Tensor:
x = self.linear2(self.dropout(self.activation(self.linear1(x))))
return self.dropout2(x)
def generate_square_subsequent_mask(sz: int, device: str = "cpu") -> torch.Tensor:
""" Generate the attention mask for causal decoding """
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
mask = (
mask.float()
.masked_fill(mask == 0, float("-inf"))
.masked_fill(mask == 1, float(0.0))
).to(device=device)
return mask
\ No newline at end of file
import torch
import torch.nn.functional as F
from torch import Tensor
def generate_square_subsequent_mask(sz: int, condition_len: int = 1, bool_out=False, device: str = "cpu") -> torch.Tensor:
""" Generate the attention mask for causal decoding """
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
if condition_len > 1:
mask[:condition_len,:condition_len] = 1
if not bool_out:
mask = (
mask.float()
.masked_fill(mask == 0, float("-inf"))
.masked_fill(mask == 1, float(0.0)))
return mask.to(device=device)
def dequantize_verts(verts, canvas_size: Tensor, add_noise=False):
"""Quantizes vertices and outputs integers with specified n_bits."""
min_range = -1
max_range = 1
range_quantize = canvas_size
verts = verts.type(torch.float32)
verts = verts * (max_range - min_range) / range_quantize + min_range
if add_noise:
verts += torch.rand_like(verts) * range_quantize
return verts
def quantize_verts(
verts,
canvas_size: Tensor):
"""Convert vertices from its original range ([-1,1]) to discrete values in [0, n_bits**2 - 1].
Args:
verts: seqlen, 2
"""
min_range = -1
max_range = 1
range_quantize = canvas_size-1
verts_ratio = (verts - min_range) / (
max_range - min_range)
verts_quantize = verts_ratio * range_quantize
return verts_quantize.type(torch.int32)
def top_k_logits(logits, k):
"""Masks logits such that logits not in top-k are small."""
if k == 0:
return logits
else:
values, _ = torch.topk(logits, k=k)
k_largest = torch.min(values)
logits = torch.where(logits < k_largest,
torch.ones_like(logits)*-1e9, logits)
return logits
def top_p_logits(logits, p):
"""Masks logits using nucleus (top-p) sampling."""
if p == 1:
return logits
else:
seq, dim = logits.shape[1:]
logits = logits.view(-1, dim)
sort_indices = torch.argsort(logits, dim=-1, descending=True)
probs = F.softmax(logits, dim=-1).gather(-1, sort_indices)
cumprobs = torch.cumsum(probs, dim=-1) - probs
# The top 1 candidate always will not be masked.
# This way ensures at least 1 indices will be selected.
sort_mask = (cumprobs > p).type(logits.dtype)
batch_indices = torch.repeat_interleave(
torch.arange(logits.shape[0]).unsqueeze(-1), dim, dim=-1)
top_p_mask = torch.zeros_like(logits)
top_p_mask = top_p_mask.scatter_add(-1, sort_indices, sort_mask)
logits -= top_p_mask * 1e9
return logits.view(-1, seq, dim)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment