Unverified Commit cce49ba9 authored by Chengyu Wang's avatar Chengyu Wang Committed by GitHub
Browse files

Add openlane v2 (#121)

parent dbf29e61
from .assigners import *
from .match_costs import *
\ No newline at end of file
# ==============================================================================
# Binaries and/or source for the following packages or projects
# are presented under one or more of the following open source licenses:
# assigners.py The OpenLane-V2 Dataset Authors Apache License, Version 2.0
#
# Contact wanghuijie@pjlab.org.cn if you have any issue.
#
# Copyright (c) 2023 The OpenLane-v2 Dataset Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import torch
from scipy.optimize import linear_sum_assignment
from mmdet.core.bbox.builder import BBOX_ASSIGNERS
from mmdet.core.bbox.assigners import HungarianAssigner, AssignResult
@BBOX_ASSIGNERS.register_module()
class LaneHungarianAssigner(HungarianAssigner):
def assign(self,
lane_pred,
cls_pred,
gt_lanes,
gt_labels,
img_meta,
gt_lanes_ignore=None,
eps=1e-7):
assert gt_lanes_ignore is None, \
'Only case when gt_lanes_ignore is None is supported.'
num_gts, num_lanes = gt_lanes.size(0), lane_pred.size(0)
# 1. assign -1 by default
assigned_gt_inds = lane_pred.new_full((num_lanes, ),
-1,
dtype=torch.long)
assigned_labels = lane_pred.new_full((num_lanes, ),
-1,
dtype=torch.long)
if num_gts == 0 or num_lanes == 0:
# No ground truth or boxes, return empty assignment
if num_gts == 0:
# No ground truth, assign all to background
assigned_gt_inds[:] = 0
return AssignResult(
num_gts, assigned_gt_inds, None, labels=assigned_labels)
# 2. compute the weighted costs
# classification and lanecost.
cls_cost = self.cls_cost(cls_pred, gt_labels)
# regression L1 cost
reg_cost = self.reg_cost(lane_pred, gt_lanes)
# weighted sum of above three costs
cost = cls_cost + reg_cost
# 3. do Hungarian matching on CPU using linear_sum_assignment
cost = cost.detach().cpu()
matched_row_inds, matched_col_inds = linear_sum_assignment(cost)
matched_row_inds = torch.from_numpy(matched_row_inds).to(
lane_pred.device)
matched_col_inds = torch.from_numpy(matched_col_inds).to(
lane_pred.device)
# 4. assign backgrounds and foregrounds
# assign all indices to backgrounds first
assigned_gt_inds[:] = 0
# assign foregrounds based on matching results
assigned_gt_inds[matched_row_inds] = matched_col_inds + 1
assigned_labels[matched_row_inds] = gt_labels[matched_col_inds]
return AssignResult(
num_gts, assigned_gt_inds, None, labels=assigned_labels)
# ==============================================================================
# Binaries and/or source for the following packages or projects
# are presented under one or more of the following open source licenses:
# match_costs.py The OpenLane-V2 Dataset Authors Apache License, Version 2.0
#
# Contact wanghuijie@pjlab.org.cn if you have any issue.
#
# Copyright (c) 2023 The OpenLane-v2 Dataset Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import torch
from mmdet.core.bbox.match_costs.builder import MATCH_COST
@MATCH_COST.register_module()
class LaneL1Cost:
r"""
Notes
-----
Adapted from https://github.com/open-mmlab/mmdetection/blob/master/mmdet/core/bbox/match_costs/match_cost.py#L11.
"""
def __init__(self, weight=1.):
self.weight = weight
def __call__(self, lane_pred, gt_lanes):
lane_cost = torch.cdist(lane_pred, gt_lanes, p=1)
return lane_cost * self.weight
from .pipelines import *
from .openlane_v2_dataset import *
\ No newline at end of file
# ==============================================================================
# Binaries and/or source for the following packages or projects
# are presented under one or more of the following open source licenses:
# openlane_v2_dataset.py The OpenLane-V2 Dataset Authors Apache License, Version 2.0
#
# Contact wanghuijie@pjlab.org.cn if you have any issue.
#
# Copyright (c) 2023 The OpenLane-v2 Dataset Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import os
import cv2
import torch
import numpy as np
from math import factorial
from pyquaternion import Quaternion
import mmcv
from mmdet.datasets import DATASETS
from mmdet3d.datasets import Custom3DDataset
from openlanev2.dataset import Collection
from openlanev2.evaluation import evaluate as openlanev2_evaluate
from openlanev2.preprocessing import check_results
from openlanev2.visualization.utils import COLOR_DICT
COLOR_GT = (0, 255, 0)
COLOR_GT_TOPOLOGY = (0, 127, 0)
COLOR_PRED = (0, 0, 255)
COLOR_PRED_TOPOLOGY = (0, 0, 127)
COLOR_DICT = {k: (v[2], v[1], v[0]) for k, v in COLOR_DICT.items()}
def render_pv(images, lidar2imgs, gt_lc, pred_lc, gt_te, gt_te_attr, pred_te, pred_te_attr):
results = []
for idx, (image, lidar2img) in enumerate(zip(images, lidar2imgs)):
if gt_lc is not None :
for lc in gt_lc:
xyz1 = np.concatenate([lc, np.ones((lc.shape[0], 1))], axis=1)
xyz1 = xyz1 @ lidar2img.T
xyz1 = xyz1[xyz1[:, 2] > 1e-5]
if xyz1.shape[0] == 0:
continue
points_2d = xyz1[:, :2] / xyz1[:, 2:3]
points_2d = points_2d.astype(int)
image = cv2.polylines(image, points_2d[None], False, COLOR_GT, 2)
if pred_lc is not None:
for lc in pred_lc:
xyz1 = np.concatenate([lc, np.ones((lc.shape[0], 1))], axis=1)
xyz1 = xyz1 @ lidar2img.T
xyz1 = xyz1[xyz1[:, 2] > 1e-5]
if xyz1.shape[0] == 0:
continue
points_2d = xyz1[:, :2] / xyz1[:, 2:3]
points_2d = points_2d.astype(int)
image = cv2.polylines(image, points_2d[None], False, COLOR_PRED, 2)
if idx == 0: # front view image
if gt_te is not None:
for bbox, attr in zip(gt_te, gt_te_attr):
b = bbox.astype(np.int32)
image = render_corner_rectangle(image, (b[0], b[1]), (b[2], b[3]), COLOR_DICT[attr], 3, 1)
if pred_te is not None:
for bbox, attr in zip(pred_te, pred_te_attr):
b = bbox.astype(np.int32)
image = cv2.rectangle(image, (b[0], b[1]), (b[2], b[3]), COLOR_DICT[attr], 3)
results.append(image)
return results
def render_corner_rectangle(img, pt1, pt2, color,
corner_thickness=3, edge_thickness=2,
centre_cross=False, lineType=cv2.LINE_8):
corner_length = min(abs(pt1[0] - pt2[0]), abs(pt1[1] - pt2[1])) // 4
e_args = [color, edge_thickness, lineType]
c_args = [color, corner_thickness, lineType]
# edges
img = cv2.line(img, (pt1[0] + corner_length, pt1[1]), (pt2[0] - corner_length, pt1[1]), *e_args)
img = cv2.line(img, (pt2[0], pt1[1] + corner_length), (pt2[0], pt2[1] - corner_length), *e_args)
img = cv2.line(img, (pt1[0], pt1[1] + corner_length), (pt1[0], pt2[1] - corner_length), *e_args)
img = cv2.line(img, (pt1[0] + corner_length, pt2[1]), (pt2[0] - corner_length, pt2[1]), *e_args)
# corners
img = cv2.line(img, pt1, (pt1[0] + corner_length, pt1[1]), *c_args)
img = cv2.line(img, pt1, (pt1[0], pt1[1] + corner_length), *c_args)
img = cv2.line(img, (pt2[0], pt1[1]), (pt2[0] - corner_length, pt1[1]), *c_args)
img = cv2.line(img, (pt2[0], pt1[1]), (pt2[0], pt1[1] + corner_length), *c_args)
img = cv2.line(img, (pt1[0], pt2[1]), (pt1[0] + corner_length, pt2[1]), *c_args)
img = cv2.line(img, (pt1[0], pt2[1]), (pt1[0], pt2[1] - corner_length), *c_args)
img = cv2.line(img, pt2, (pt2[0] - corner_length, pt2[1]), *c_args)
img = cv2.line(img, pt2, (pt2[0], pt2[1] - corner_length), *c_args)
if centre_cross:
cx, cy = int((pt1[0] + pt2[0]) / 2), int((pt1[1] + pt2[1]) / 2)
img = cv2.line(img, (cx - corner_length, cy), (cx + corner_length, cy), *e_args)
img = cv2.line(img, (cx, cy - corner_length), (cx, cy + corner_length), *e_args)
return img
def render_front_view(image, lidar2img, gt_lc, pred_lc, gt_te, pred_te, gt_topology_lcte, pred_topology_lcte):
if gt_topology_lcte is not None:
for lc_idx, lcte in enumerate(gt_topology_lcte):
for te_idx, connected in enumerate(lcte):
if connected:
lc = gt_lc[lc_idx]
lc = lc[len(lc) // 2][None, ...]
xyz1 = np.concatenate([lc, np.ones((lc.shape[0], 1))], axis=1)
xyz1 = xyz1 @ lidar2img.T
xyz1 = xyz1[xyz1[:, 2] > 1e-5]
if xyz1.shape[0] == 0:
continue
p1 = (xyz1[:, :2] / xyz1[:, 2:3])[0].astype(int)
te = gt_te[te_idx]
p2 = np.array([(te[0]+te[2])/2, te[3]]).astype(int)
image = cv2.arrowedLine(image, (p2[0], p2[1]), (p1[0], p1[1]), COLOR_GT_TOPOLOGY, tipLength=0.03)
if pred_topology_lcte is not None:
for lc_idx, lcte in enumerate(pred_topology_lcte):
for te_idx, connected in enumerate(lcte):
if connected:
lc = pred_lc[lc_idx]
lc = lc[len(lc) // 2][None, ...]
xyz1 = np.concatenate([lc, np.ones((lc.shape[0], 1))], axis=1)
xyz1 = xyz1 @ lidar2img.T
xyz1 = xyz1[xyz1[:, 2] > 1e-5]
if xyz1.shape[0] == 0:
continue
p1 = (xyz1[:, :2] / xyz1[:, 2:3])[0].astype(int)
te = pred_te[te_idx]
p2 = np.array([(te[0]+te[2])/2, te[3]]).astype(int)
image = cv2.arrowedLine(image, (p2[0], p2[1]), (p1[0], p1[1]), COLOR_PRED_TOPOLOGY, tipLength=0.03)
return image
def render_bev(gt_lc=None, pred_lc=None, gt_topology_lclc=None, pred_topology_lclc=None, map_size=[-52, 52, -27, 27], scale=20):
image = np.zeros((int(scale*(map_size[1]-map_size[0])), int(scale*(map_size[3] - map_size[2])), 3), dtype=np.uint8)
if gt_lc is not None:
for lc in gt_lc:
draw_coor = (scale * (-lc[:, :2] + np.array([map_size[1], map_size[3]]))).astype(np.int)
image = cv2.polylines(image, [draw_coor[:, [1,0]]], False, COLOR_GT, max(round(scale * 0.2), 1))
image = cv2.circle(image, (draw_coor[0, 1], draw_coor[0, 0]), max(round(scale * 0.5), 3), COLOR_GT, -1)
image = cv2.circle(image, (draw_coor[-1, 1], draw_coor[-1, 0]), max(round(scale * 0.5), 3), COLOR_GT, -1)
if gt_topology_lclc is not None:
for l1_idx, lclc in enumerate(gt_topology_lclc):
for l2_idx, connected in enumerate(lclc):
if connected:
l1 = gt_lc[l1_idx]
l2 = gt_lc[l2_idx]
l1_mid = len(l1) // 2
l2_mid = len(l2) // 2
p1 = (scale * (-l1[l1_mid, :2] + np.array([map_size[1], map_size[3]]))).astype(np.int)
p2 = (scale * (-l2[l2_mid, :2] + np.array([map_size[1], map_size[3]]))).astype(np.int)
image = cv2.arrowedLine(image, (p1[1], p1[0]), (p2[1], p2[0]), COLOR_GT_TOPOLOGY, max(round(scale * 0.1), 1), tipLength=0.03)
if pred_lc is not None:
for lc in pred_lc:
draw_coor = (scale * (-lc[:, :2] + np.array([map_size[1], map_size[3]]))).astype(np.int)
image = cv2.polylines(image, [draw_coor[:, [1,0]]], False, COLOR_PRED, max(round(scale * 0.2), 1))
image = cv2.circle(image, (draw_coor[0, 1], draw_coor[0, 0]), max(round(scale * 0.5), 3), COLOR_PRED, -1)
image = cv2.circle(image, (draw_coor[-1, 1], draw_coor[-1, 0]), max(round(scale * 0.5), 3), COLOR_PRED, -1)
if pred_topology_lclc is not None:
for l1_idx, lclc in enumerate(pred_topology_lclc):
for l2_idx, connected in enumerate(lclc):
if connected:
l1 = pred_lc[l1_idx]
l2 = pred_lc[l2_idx]
l1_mid = len(l1) // 2
l2_mid = len(l2) // 2
p1 = (scale * (-l1[l1_mid, :2] + np.array([map_size[1], map_size[3]]))).astype(np.int)
p2 = (scale * (-l2[l2_mid, :2] + np.array([map_size[1], map_size[3]]))).astype(np.int)
image = cv2.arrowedLine(image, (p1[1], p1[0]), (p2[1], p2[0]), COLOR_PRED_TOPOLOGY, max(round(scale * 0.1), 1), tipLength=0.03)
return image
@DATASETS.register_module()
class OpenLaneV2SubsetADataset(Custom3DDataset):
CLASSES = [None]
def __init__(self,
data_root,
meta_root,
collection,
pipeline,
test_mode,
):
self.ann_file = f'{meta_root}/{collection}.pkl'
super().__init__(
data_root=data_root,
ann_file=self.ann_file,
pipeline=pipeline,
test_mode=test_mode,
)
def load_annotations(self, ann_file):
ann_file = ann_file.name.split('.pkl')[0].split('/')
self.collection = Collection(data_root=self.data_root, meta_root='/'.join(ann_file[:-1]), collection=ann_file[-1])
return self.collection.keys
def get_data_info(self, index):
split, segment_id, timestamp = self.data_infos[index]
frame = self.collection.get_frame_via_identifier((split, segment_id, timestamp))
img_paths = []
lidar2img_rts = []
lidar2cam_rts = []
cam_intrinsics = []
rots = []
trans = []
cam2imgs = []
for i, camera in enumerate(frame.get_camera_list()):
assert camera == 'ring_front_center' if i == 0 else True, \
'the first image should be the front view'
lidar2cam_r = np.linalg.inv(frame.get_extrinsic(camera)['rotation'])
lidar2cam_t = frame.get_extrinsic(camera)['translation'] @ lidar2cam_r.T
lidar2cam_rt = np.eye(4)
lidar2cam_rt[:3, :3] = lidar2cam_r.T
lidar2cam_rt[3, :3] = -lidar2cam_t
intrinsic = frame.get_intrinsic(camera)['K']
viewpad = np.eye(4)
viewpad[:intrinsic.shape[0], :intrinsic.shape[1]] = intrinsic
lidar2img_rt = (viewpad @ lidar2cam_rt.T)
img_paths.append(frame.get_image_path(camera))
lidar2cam_rts.append(lidar2cam_rt.T)
cam_intrinsics.append(viewpad)
lidar2img_rts.append(lidar2img_rt)
rots.append(np.linalg.inv(frame.get_extrinsic(camera)['rotation']))
trans.append(-frame.get_extrinsic(camera)['translation'])
cam2imgs.append(frame.get_intrinsic(camera)['K'])
can_bus = np.zeros(18)
rotation = Quaternion._from_matrix(frame.get_pose()['rotation'])
can_bus[:3] = frame.get_pose()['translation']
can_bus[3:7] = rotation
patch_angle = rotation.yaw_pitch_roll[0] / np.pi * 180
if patch_angle < 0:
patch_angle += 360
can_bus[-2] = patch_angle / 180 * np.pi
can_bus[-1] = patch_angle
input_dict = {
'scene_token': segment_id,
'sample_idx': timestamp,
'img_paths': img_paths,
'lidar2cam': lidar2cam_rts,
'cam_intrinsic': cam_intrinsics,
'lidar2img': lidar2img_rts,
'rots': rots,
'trans': trans,
'cam2imgs': cam2imgs,
'can_bus': can_bus,
}
input_dict.update(self.get_ann_info(index))
return input_dict
def get_ann_info(self, index):
split, segment_id, timestamp = self.data_infos[index]
frame = self.collection.get_frame_via_identifier((split, segment_id, timestamp))
gt_lc = np.array([lc['points'] for lc in frame.get_annotations_lane_centerlines()], dtype=np.float32)
gt_lc_labels = np.zeros((len(gt_lc), ), dtype=np.int64)
gt_te = np.array([element['points'].flatten() for element in frame.get_annotations_traffic_elements()], dtype=np.float32).reshape(-1, 4)
gt_te_labels = np.array([element['attribute']for element in frame.get_annotations_traffic_elements()], dtype=np.int64)
gt_topology_lclc = frame.get_annotations_topology_lclc()
gt_topology_lcte = frame.get_annotations_topology_lcte()
assert gt_lc.shape[0] == gt_topology_lclc.shape[0] == gt_topology_lclc.shape[1] == gt_topology_lcte.shape[0]
assert gt_te.shape[0] == gt_topology_lcte.shape[1]
return {
'gt_lc': gt_lc,
'gt_lc_labels': gt_lc_labels,
'gt_te': gt_te,
'gt_te_labels': gt_te_labels,
'gt_topology_lclc': gt_topology_lclc,
'gt_topology_lcte': gt_topology_lcte,
}
def pre_pipeline(self, results):
pass
def prepare_train_data(self, index):
input_dict = self.get_data_info(index)
if input_dict is None:
return None
self.pre_pipeline(input_dict)
example = self.pipeline(input_dict)
return example
def evaluate(self,
results,
logger=None,
dump=None,
dump_dir=None,
visualization=False,
visualization_dir=None,
visualization_num=None,
**kwargs):
if logger:
logger.info(f'Start formating...')
pred_dict = self.format_preds(results)
if dump:
assert dump_dir is not None
assert check_results(pred_dict), "Please fill the missing keys."
output_path = os.path.join(dump_dir, 'result.pkl')
mmcv.dump(pred_dict, output_path)
if visualization:
assert visualization_dir is not None
self.visualize(pred_dict, visualization_dir, visualization_num, **kwargs)
if logger:
logger.info(f'Start evaluatation...')
metric_results = {}
for key, val in openlanev2_evaluate(ground_truth=self.ann_file, predictions=pred_dict).items():
for k, v in val.items():
metric_results[k if k != 'score' else key] = v
return metric_results
def format_preds(self, results):
predictions = {
'method': 'dummy',
'authors': ['dummy'],
'e-mail': 'dummy',
'institution / company': 'dummy',
# 'country / region': None,
'results': {},
}
for index, result in enumerate(results):
prediction = {
'lane_centerline': [],
'traffic_element': [],
'topology_lclc': None,
'topology_lcte': None,
}
# lc
pred_lc = result['pred_lc']
sorted_index = np.argsort(pred_lc[1][:, 0])[:100]
lanes, confidences = pred_lc[0][sorted_index], pred_lc[1][:, 0][sorted_index]
lanes = lanes.reshape(-1, lanes.shape[-1] // 3, 3)
def comb(n, k):
return factorial(n) // (factorial(k) * factorial(n - k))
n_points = 11
n_control = lanes.shape[1]
A = np.zeros((n_points, n_control))
t = np.arange(n_points) / (n_points - 1)
for i in range(n_points):
for j in range(n_control):
A[i, j] = comb(n_control - 1, j) * np.power(1 - t[i], n_control - 1 - j) * np.power(t[i], j)
bezier_A = torch.tensor(A, dtype=torch.float32)
lanes = torch.tensor(lanes, dtype=torch.float32)
lanes = torch.einsum('ij,njk->nik', bezier_A, lanes)
lanes = lanes.numpy()
for i, (lane, confidence) in enumerate(zip(lanes, confidences)):
prediction['lane_centerline'].append({
'id': i + 1000,
'points': lane.astype(np.float32),
'confidence': confidence,
})
# te
pred_te = result['pred_te']
for i, (bbox, confidence) in enumerate(zip(*pred_te)):
prediction['traffic_element'].append({
'id': i + 2000,
'attribute': bbox[-1],
'points': bbox[:-1].reshape(2, 2).astype(np.float32),
'confidence': confidence,
})
# topology
prediction['topology_lclc'] = result['pred_topology_lclc']
prediction['topology_lcte'] = result['pred_topology_lcte']
#
predictions['results'][self.data_infos[index]] = {
'predictions': prediction,
}
return predictions
def visualize(self, pred_dict, visualization_dir, visualization_num, confidence_threshold=0.3, **kwargs):
assert visualization_dir, 'Please specify visualization_dir for saving visualization.'
print('\nStart visualization...\n')
for index, (key, prediction) in enumerate(pred_dict['results'].items()):
if visualization_num and index >= visualization_num:
print(f'\nOnly {visualization_num} frames are visualized.\n')
return
frame = self.collection.get_frame_via_identifier(key)
prediction = prediction['predictions']
# calculate metric
pred_result = {
'method': 'dummy',
'authors': 'dummy',
'results': {
key: {
'predictions': prediction,
}
}
}
gt_result = {key: {'annotation': frame.get_annotations()}}
try:
metric_results = openlanev2_evaluate(gt_result, pred_result, verbose=False)
except Exception:
metric_results = None
# filter lc
pred_lc_mask = np.array([lc['confidence'] for lc in prediction['lane_centerline']]) > confidence_threshold
pred_lc = np.array([lc['points'] for lc in prediction['lane_centerline']])[pred_lc_mask]
# filter te
pred_te_mask = np.array([te['confidence'] for te in prediction['traffic_element']]) > confidence_threshold
pred_te = np.array([te['points'].flatten() for te in prediction['traffic_element']])[pred_te_mask]
pred_te_attr = np.array([te['attribute'] for te in prediction['traffic_element']])[pred_te_mask]
# filter topology
pred_topology_lclc = prediction['topology_lclc'][pred_lc_mask][:, pred_lc_mask] > confidence_threshold
pred_topology_lcte = prediction['topology_lcte'][pred_lc_mask][:, pred_te_mask] > confidence_threshold
data_info = self.get_data_info(index)
if frame.get_annotations():
gt_lc = np.array([lc['points'] for lc in frame.get_annotations_lane_centerlines()])
gt_te = np.array([element['points'].flatten() for element in frame.get_annotations_traffic_elements()]).reshape(-1, 4)
gt_te_attr = np.array([element['attribute']for element in frame.get_annotations_traffic_elements()])
gt_topology_lclc = frame.get_annotations_topology_lclc()
gt_topology_lcte = frame.get_annotations_topology_lcte()
else:
gt_lc, gt_te, gt_te_attr, gt_topology_lclc, gt_topology_lcte = None, None, None, None, None
# render pv
images = [mmcv.imread(img_path) for img_path in data_info['img_paths']]
images = render_pv(
images, data_info['lidar2img'],
gt_lc=gt_lc, pred_lc=pred_lc,
gt_te=gt_te, gt_te_attr=gt_te_attr, pred_te=pred_te, pred_te_attr=pred_te_attr,
)
for cam_idx, image in enumerate(images):
output_path = os.path.join(visualization_dir, f'{"/".join(key)}/pv_{frame.get_camera_list()[cam_idx]}.jpg')
mmcv.imwrite(image, output_path)
img_pts = [
(0, 3321, 2048, 4871),
(356, 1273, 1906, 3321),
(356, 4871, 1906, 6919),
(2048, 4096, 3598, 6144),
(2048, 2048, 3598, 4096),
(2048, 6144, 3598, 8192),
(2048, 0, 3598, 2048),
]
multiview = np.zeros([3598, 8192, 3], dtype=np.uint8)
for idx, pts in enumerate(img_pts):
multiview[pts[0]:pts[2], pts[1]:pts[3]] = images[idx]
multiview[2048:] = multiview[2048:, ::-1]
multiview = cv2.resize(multiview, None, fx=0.5, fy=0.5)
output_path = os.path.join(visualization_dir, f'{"/".join(key)}/pv_multiview.jpg')
mmcv.imwrite(multiview, output_path)
front_view = render_front_view(
images[0], data_info['lidar2img'][0],
gt_lc=gt_lc, pred_lc=pred_lc,
gt_te=gt_te, pred_te=pred_te,
gt_topology_lcte=gt_topology_lcte,
pred_topology_lcte=pred_topology_lcte,
)
output_path = os.path.join(visualization_dir, f'{"/".join(key)}/pv_{frame.get_camera_list()[0]}_topology.jpg')
mmcv.imwrite(front_view, output_path)
# render bev
if metric_results is not None:
info = []
for k, v in metric_results['OpenLane-V2 Score'].items():
if k == 'score':
continue
info.append(f'{k}: {(lambda x: "%.2f" % x)(v)}')
info = ' / '.join(info)
else:
info = '-'
bev_lane = render_bev(
gt_lc=gt_lc, pred_lc=pred_lc,
map_size=[-52, 55, -27, 27], scale=20,
)
bev_lane = cv2.putText(bev_lane, info, (30, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.8, COLOR_GT, 2)
output_path = os.path.join(visualization_dir, f'{"/".join(key)}/bev_lane.jpg')
mmcv.imwrite(bev_lane, output_path)
bev_gt = render_bev(
gt_lc=gt_lc,
gt_topology_lclc=gt_topology_lclc,
map_size=[-52, 55, -27, 27], scale=20,
)
bev_pred = render_bev(
pred_lc=pred_lc,
pred_topology_lclc=pred_topology_lclc,
map_size=[-52, 55, -27, 27], scale=20,
)
divider = np.ones((bev_gt.shape[0], 7, 3), dtype=np.uint8) * 128
bev_topology = np.concatenate([bev_gt, divider, bev_pred], axis=1)
bev_topology = cv2.putText(bev_topology, info, (30, 45), cv2.FONT_HERSHEY_SIMPLEX, 0.8, COLOR_GT, 2)
output_path = os.path.join(visualization_dir, f'{"/".join(key)}/bev_topology.jpg')
mmcv.imwrite(bev_topology, output_path)
from .formating import *
from .loading import *
from .transforms import *
\ No newline at end of file
# ==============================================================================
# Binaries and/or source for the following packages or projects
# are presented under one or more of the following open source licenses:
# formating.py The OpenLane-V2 Dataset Authors Apache License, Version 2.0
#
# Contact wanghuijie@pjlab.org.cn if you have any issue.
#
# Copyright (c) 2023 The OpenLane-v2 Dataset Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import numpy as np
from mmcv.parallel import DataContainer as DC
from mmdet.datasets import PIPELINES
from mmdet.datasets.pipelines import to_tensor
@PIPELINES.register_module()
class CustomDefaultFormatBundle:
def __init__(self):
pass
def __call__(self, results):
temp = to_tensor(np.concatenate([i[None, ...] for i in results['img']], axis=0))
results['img'] = DC(temp.permute(0, 3, 1, 2), stack=True)
if 'gt_lc' in results:
results['gt_lc'] = DC(to_tensor(results['gt_lc']))
if 'gt_lc_labels' in results:
results['gt_lc_labels'] = DC(to_tensor(results['gt_lc_labels']))
if 'gt_te' in results:
results['gt_te'] = DC(to_tensor(results['gt_te']))
if 'gt_te_labels' in results:
results['gt_te_labels'] = DC(to_tensor(results['gt_te_labels']))
if 'gt_topology_lclc' in results:
results['gt_topology_lclc'] = DC(to_tensor(results['gt_topology_lclc']))
if 'gt_topology_lcte' in results:
results['gt_topology_lcte'] = DC(to_tensor(results['gt_topology_lcte']))
return results
# ==============================================================================
# Binaries and/or source for the following packages or projects
# are presented under one or more of the following open source licenses:
# loading.py The OpenLane-V2 Dataset Authors Apache License, Version 2.0
#
# Contact wanghuijie@pjlab.org.cn if you have any issue.
#
# Copyright (c) 2023 The OpenLane-v2 Dataset Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import numpy as np
import mmcv
from mmdet.datasets import PIPELINES
from mmdet3d.datasets.pipelines import LoadMultiViewImageFromFiles
@PIPELINES.register_module()
class CustomLoadMultiViewImageFromFiles(LoadMultiViewImageFromFiles):
def __call__(self, results):
filename = results['img_paths']
img = [mmcv.imread(name, self.color_type) for name in filename]
if self.to_float32:
img = [i.astype(np.float32) for i in img]
results['img'] = img
results['img_shape'] = [i.shape for i in results['img']]
return results
# ==============================================================================
# Binaries and/or source for the following packages or projects
# are presented under one or more of the following open source licenses:
# transforms.py The OpenLane-V2 Dataset Authors Apache License, Version 2.0
#
# Contact wanghuijie@pjlab.org.cn if you have any issue.
#
# Copyright (c) 2023 The OpenLane-v2 Dataset Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import numpy as np
from numpy import random
from math import factorial
import mmcv
from mmdet.datasets import PIPELINES
@PIPELINES.register_module()
class ResizeFrontView:
def __init__(self):
pass
def __call__(self, results):
assert 'ring_front_center' in results['img_paths'][0], \
'the first image should be the front view'
#image
front_view = results['img'][0]
h, w, _ = front_view.shape
resiezed_front_view, w_scale, h_scale = mmcv.imresize(
front_view,
(h, w),
return_scale=True,
)
results['img'][0] = resiezed_front_view
results['img_shape'][0] = resiezed_front_view.shape
# gt
scale_factor = np.array(
[w_scale, h_scale, w_scale, h_scale],
dtype=np.float32,
)
results['scale_factor'] = scale_factor
if 'gt_te' in results:
results['gt_te'] = results['gt_te'] * results['scale_factor']
# intrinsic
lidar2cam_r = results['rots'][0]
lidar2cam_t = (-results['trans'][0]) @ lidar2cam_r.T
lidar2cam_rt = np.eye(4)
lidar2cam_rt[:3, :3] = lidar2cam_r.T
lidar2cam_rt[3, :3] = -lidar2cam_t
intrinsic = results['cam2imgs'][0]
viewpad = np.eye(4)
viewpad[:intrinsic.shape[0], :intrinsic.shape[1]] = intrinsic
cam_s = np.eye(4)
cam_s[0, 0] *= w_scale
cam_s[1, 1] *= h_scale
viewpad = cam_s @ viewpad
intrinsic = viewpad[:intrinsic.shape[0], :intrinsic.shape[1]]
lidar2img_rt = (viewpad @ lidar2cam_rt.T)
results['cam_intrinsic'][0] = viewpad
results['lidar2img'][0] = lidar2img_rt
results['cam2imgs'][0] = intrinsic
return results
@PIPELINES.register_module()
class NormalizeMultiviewImage:
r"""
Notes
-----
Adapted from https://github.com/fundamentalvision/BEVFormer/blob/master/projects/mmdet3d_plugin/datasets/pipelines/transform_3d.py#L62.
Normalize the image.
Added key is "img_norm_cfg".
Args:
mean (sequence): Mean values of 3 channels.
std (sequence): Std values of 3 channels.
to_rgb (bool): Whether to convert the image from BGR to RGB,
default is true.
"""
def __init__(self, mean, std, to_rgb=True):
self.mean = np.array(mean, dtype=np.float32)
self.std = np.array(std, dtype=np.float32)
self.to_rgb = to_rgb
def __call__(self, results):
"""Call function to normalize images.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Normalized results, 'img_norm_cfg' key is added into
result dict.
"""
results['img'] = [mmcv.imnormalize(img, self.mean, self.std, self.to_rgb) for img in results['img']]
results['img_norm_cfg'] = dict(
mean=self.mean, std=self.std, to_rgb=self.to_rgb)
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(mean={self.mean}, std={self.std}, to_rgb={self.to_rgb})'
return repr_str
@PIPELINES.register_module()
class PhotoMetricDistortionMultiViewImage:
r"""
Notes
-----
Adapted from https://github.com/fundamentalvision/BEVFormer/blob/master/projects/mmdet3d_plugin/datasets/pipelines/transform_3d.py#L99.
Apply photometric distortion to image sequentially, every transformation
is applied with a probability of 0.5. The position of random contrast is in
second or second to last.
1. random brightness
2. random contrast (mode 0)
3. convert color from BGR to HSV
4. random saturation
5. random hue
6. convert color from HSV to BGR
7. random contrast (mode 1)
8. randomly swap channels
Args:
brightness_delta (int): delta of brightness.
contrast_range (tuple): range of contrast.
saturation_range (tuple): range of saturation.
hue_delta (int): delta of hue.
"""
def __init__(self,
brightness_delta=32,
contrast_range=(0.5, 1.5),
saturation_range=(0.5, 1.5),
hue_delta=18):
self.brightness_delta = brightness_delta
self.contrast_lower, self.contrast_upper = contrast_range
self.saturation_lower, self.saturation_upper = saturation_range
self.hue_delta = hue_delta
def __call__(self, results):
"""Call function to perform photometric distortion on images.
Args:
results (dict): Result dict from loading pipeline.
Returns:
dict: Result dict with images distorted.
"""
imgs = results['img']
new_imgs = []
for img in imgs:
assert img.dtype == np.float32, \
'PhotoMetricDistortion needs the input image of dtype np.float32,'\
' please set "to_float32=True" in "LoadImageFromFile" pipeline'
# random brightness
if random.randint(2):
delta = random.uniform(-self.brightness_delta,
self.brightness_delta)
img += delta
# mode == 0 --> do random contrast first
# mode == 1 --> do random contrast last
mode = random.randint(2)
if mode == 1:
if random.randint(2):
alpha = random.uniform(self.contrast_lower,
self.contrast_upper)
img *= alpha
# convert color from BGR to HSV
img = mmcv.bgr2hsv(img)
# random saturation
if random.randint(2):
img[..., 1] *= random.uniform(self.saturation_lower,
self.saturation_upper)
# random hue
if random.randint(2):
img[..., 0] += random.uniform(-self.hue_delta, self.hue_delta)
img[..., 0][img[..., 0] > 360] -= 360
img[..., 0][img[..., 0] < 0] += 360
# convert color from HSV to BGR
img = mmcv.hsv2bgr(img)
# random contrast
if mode == 0:
if random.randint(2):
alpha = random.uniform(self.contrast_lower,
self.contrast_upper)
img *= alpha
# randomly swap channels
if random.randint(2):
img = img[..., random.permutation(3)]
new_imgs.append(img)
results['img'] = new_imgs
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'(\nbrightness_delta={self.brightness_delta},\n'
repr_str += 'contrast_range='
repr_str += f'{(self.contrast_lower, self.contrast_upper)},\n'
repr_str += 'saturation_range='
repr_str += f'{(self.saturation_lower, self.saturation_upper)},\n'
repr_str += f'hue_delta={self.hue_delta})'
return repr_str
@PIPELINES.register_module()
class CustomPadMultiViewImage:
def __init__(self, size_divisor=None, pad_val=0):
self.size_divisor = size_divisor
self.pad_val = pad_val
def __call__(self, results):
max_h = max([img.shape[0] for img in results['img']])
max_w = max([img.shape[1] for img in results['img']])
padded_img = [mmcv.impad(img, shape=(max_h, max_w), pad_val=self.pad_val) for img in results['img']]
if self.size_divisor is not None:
padded_img = [mmcv.impad_to_multiple(
img, self.size_divisor, pad_val=self.pad_val) for img in padded_img]
results['img'] = padded_img
results['pad_shape'] = [img.shape for img in padded_img]
results['pad_fixed_size'] = None
results['pad_size_divisor'] = self.size_divisor
return results
def __repr__(self):
repr_str = self.__class__.__name__
repr_str += f'size_divisor={self.size_divisor}, '
repr_str += f'pad_val={self.pad_val})'
return repr_str
@PIPELINES.register_module()
class CustomParameterizeLane:
def __init__(self, method, method_para):
method_list = ['bezier', 'polygon', 'bezier_Direction_attribute', 'bezier_Endpointfixed']
self.method = method
if not self.method in method_list:
raise Exception("Not implemented!")
self.method_para = method_para
def __call__(self, results):
centerlines = results['gt_lc']
para_centerlines = getattr(self, self.method)(centerlines, **self.method_para)
results['gt_lc'] = para_centerlines
return results
def comb(self, n, k):
return factorial(n) // (factorial(k) * factorial(n - k))
def fit_bezier(self, points, n_control):
n_points = len(points)
A = np.zeros((n_points, n_control))
t = np.arange(n_points) / (n_points - 1)
for i in range(n_points):
for j in range(n_control):
A[i, j] = self.comb(n_control - 1, j) * np.power(1 - t[i], n_control - 1 - j) * np.power(t[i], j)
conts = np.linalg.lstsq(A, points, rcond=None)
return conts
def fit_bezier_Endpointfixed(self, points, n_control):
n_points = len(points)
A = np.zeros((n_points, n_control))
t = np.arange(n_points) / (n_points - 1)
for i in range(n_points):
for j in range(n_control):
A[i, j] = self.comb(n_control - 1, j) * np.power(1 - t[i], n_control - 1 - j) * np.power(t[i], j)
A_BE = A[1:-1, 1:-1]
_points = points[1:-1]
_points = _points - A[1:-1, 0].reshape(-1, 1) @ points[0].reshape(1, -1) - A[1:-1, -1].reshape(-1, 1) @ points[-1].reshape(1, -1)
conts = np.linalg.lstsq(A_BE, _points, rcond=None)
control_points = np.zeros((n_control, points.shape[1]))
control_points[0] = points[0]
control_points[-1] = points[-1]
control_points[1:-1] = conts[0]
return control_points
def bezier(self, input_data, n_control=2):
coeffs_list = []
for idx, centerline in enumerate(input_data):
sorted_x = np.array(centerline[:, 1])
sorted_y = np.array(centerline[:, 0])
points = np.array(list(zip(sorted_x, sorted_y)))
res = self.fit_bezier(points, n_control)[0]
start_res = res[0]
end_res = res[-1]
first_diff = (np.sum(np.square(start_res - points[0]))) + (np.sum(np.square(end_res - points[-1])))
second_diff = (np.sum(np.square(start_res - points[-1]))) + (np.sum(np.square(end_res - points[0])))
if first_diff <= second_diff:
fin_res = res
else:
fin_res = np.zeros_like(res)
for m in range(len(res)):
fin_res[len(res) - m - 1] = res[m]
fin_res = np.clip(fin_res, 0, 1)
coeffs_list.append(np.reshape(np.float32(fin_res), (-1)))
return np.array(coeffs_list)
def bezier_Direction_attribute(self, input_data, n_control=3):
coeffs_list = []
for idx, centerline in enumerate(input_data):
centerline[:, 1] = centerline[:, 1]
centerline[:, 0] = centerline[:, 0]
sorted_x = np.array(centerline[:, 1])
sorted_y = np.array(centerline[:, 0])
points = np.array(list(zip(sorted_x, sorted_y)))
res = self.fit_bezier(points, n_control)[0]
fin_res = np.clip(res, 0, 1)
start_res = res[0]
end_res = res[-1]
first_diff = (np.sum(np.square(start_res - points[0]))) + (np.sum(np.square(end_res - points[-1])))
second_diff = (np.sum(np.square(start_res - points[-1]))) + (np.sum(np.square(end_res - points[0])))
if first_diff <= second_diff:
da = 0
else:
da = 1
fin_res = np.append(fin_res, da)
coeffs_list.append(np.reshape(np.float32(fin_res), (-1)))
return np.array(coeffs_list)
def bezier_Endpointfixed(self, input_data, n_control=2):
coeffs_list = []
for idx, centerline in enumerate(input_data):
res = self.fit_bezier_Endpointfixed(centerline, n_control)
coeffs = res.flatten()
coeffs_list.append(coeffs)
return np.array(coeffs_list, dtype=np.float32)
def polygon(self, input_data, key_rep='Bounding Box'):
keypoints = []
for idx, centerline in enumerate(input_data):
centerline[:, 1] = centerline[:, 1]
centerline[:, 0] = centerline[:, 0]
sorted_x = np.array(centerline[:, 1])
sorted_y = np.array(centerline[:, 0])
points = np.array(list(zip(sorted_x, sorted_y)))
if key_rep not in ['Bounding Box', 'SME', 'Extreme Points']:
raise Exception(f"{key_rep} not existed!")
elif key_rep == 'Bounding Box':
res = np.array(
[points[:, 0].min(), points[:, 1].min(), points[:, 0].max(), points[:, 1].max()]).reshape((2, 2))
keypoints.append(np.reshape(np.float32(res), (-1)))
elif key_rep == 'SME':
res = np.array([points[0], points[-1], points[int(len(points) / 2)]])
keypoints.append(np.reshape(np.float32(res), (-1)))
else:
min_x = np.min([points[:, 0] for p in points])
ind_left = np.where(points[:, 0] == min_x)
max_x = np.max([points[:, 0] for p in points])
ind_right = np.where(points[:, 0] == max_x)
max_y = np.max([points[:, 1] for p in points])
ind_top = np.where(points[:, 1] == max_y)
min_y = np.min([points[:, 1] for p in points])
ind_botton = np.where(points[:, 1] == min_y)
res = np.array(
[points[ind_left[0][0]], points[ind_right[0][0]], points[ind_top[0][0]], points[ind_botton[0][0]]])
keypoints.append(np.reshape(np.float32(res), (-1)))
return np.array(keypoints)
from .detectors import *
from .heads import *
from .necks import *
from .modules import *
from .backbones import *
from .intern_image import InternImage
__all__ = ['InternImage']
# --------------------------------------------------------
# InternImage
# Copyright (c) 2022 OpenGVLab
# Licensed under The MIT License [see LICENSE for details]
# --------------------------------------------------------
import torch
import torch.nn as nn
from collections import OrderedDict
import torch.utils.checkpoint as checkpoint
from timm.models.layers import trunc_normal_, DropPath
from mmcv.runner import _load_checkpoint
from mmcv.cnn import constant_init, trunc_normal_init
from mmdet.utils import get_root_logger
from mmdet.models.builder import BACKBONES
import torch.nn.functional as F
from .ops_dcnv3 import modules as opsm
class to_channels_first(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
return x.permute(0, 3, 1, 2)
class to_channels_last(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
return x.permute(0, 2, 3, 1)
def build_norm_layer(dim,
norm_layer,
in_format='channels_last',
out_format='channels_last',
eps=1e-6):
layers = []
if norm_layer == 'BN':
if in_format == 'channels_last':
layers.append(to_channels_first())
layers.append(nn.BatchNorm2d(dim))
if out_format == 'channels_last':
layers.append(to_channels_last())
elif norm_layer == 'LN':
if in_format == 'channels_first':
layers.append(to_channels_last())
layers.append(nn.LayerNorm(dim, eps=eps))
if out_format == 'channels_first':
layers.append(to_channels_first())
else:
raise NotImplementedError(
f'build_norm_layer does not support {norm_layer}')
return nn.Sequential(*layers)
def build_act_layer(act_layer):
if act_layer == 'ReLU':
return nn.ReLU(inplace=True)
elif act_layer == 'SiLU':
return nn.SiLU(inplace=True)
elif act_layer == 'GELU':
return nn.GELU()
raise NotImplementedError(f'build_act_layer does not support {act_layer}')
class CrossAttention(nn.Module):
r""" Cross Attention Module
Args:
dim (int): Number of input channels.
num_heads (int): Number of attention heads. Default: 8
qkv_bias (bool, optional): If True, add a learnable bias to q, k, v.
Default: False.
qk_scale (float | None, optional): Override default qk scale of
head_dim ** -0.5 if set. Default: None.
attn_drop (float, optional): Dropout ratio of attention weight.
Default: 0.0
proj_drop (float, optional): Dropout ratio of output. Default: 0.0
attn_head_dim (int, optional): Dimension of attention head.
out_dim (int, optional): Dimension of output.
"""
def __init__(self,
dim,
num_heads=8,
qkv_bias=False,
qk_scale=None,
attn_drop=0.,
proj_drop=0.,
attn_head_dim=None,
out_dim=None):
super().__init__()
if out_dim is None:
out_dim = dim
self.num_heads = num_heads
head_dim = dim // num_heads
if attn_head_dim is not None:
head_dim = attn_head_dim
all_head_dim = head_dim * self.num_heads
self.scale = qk_scale or head_dim ** -0.5
assert all_head_dim == dim
self.q = nn.Linear(dim, all_head_dim, bias=False)
self.k = nn.Linear(dim, all_head_dim, bias=False)
self.v = nn.Linear(dim, all_head_dim, bias=False)
if qkv_bias:
self.q_bias = nn.Parameter(torch.zeros(all_head_dim))
self.k_bias = nn.Parameter(torch.zeros(all_head_dim))
self.v_bias = nn.Parameter(torch.zeros(all_head_dim))
else:
self.q_bias = None
self.k_bias = None
self.v_bias = None
self.attn_drop = nn.Dropout(attn_drop)
self.proj = nn.Linear(all_head_dim, out_dim)
self.proj_drop = nn.Dropout(proj_drop)
def forward(self, x, k=None, v=None):
B, N, C = x.shape
N_k = k.shape[1]
N_v = v.shape[1]
q_bias, k_bias, v_bias = None, None, None
if self.q_bias is not None:
q_bias = self.q_bias
k_bias = self.k_bias
v_bias = self.v_bias
q = F.linear(input=x, weight=self.q.weight, bias=q_bias)
q = q.reshape(B, N, 1, self.num_heads,
-1).permute(2, 0, 3, 1,
4).squeeze(0) # (B, N_head, N_q, dim)
k = F.linear(input=k, weight=self.k.weight, bias=k_bias)
k = k.reshape(B, N_k, 1, self.num_heads, -1).permute(2, 0, 3, 1,
4).squeeze(0)
v = F.linear(input=v, weight=self.v.weight, bias=v_bias)
v = v.reshape(B, N_v, 1, self.num_heads, -1).permute(2, 0, 3, 1,
4).squeeze(0)
q = q * self.scale
attn = (q @ k.transpose(-2, -1)) # (B, N_head, N_q, N_k)
attn = attn.softmax(dim=-1)
attn = self.attn_drop(attn)
x = (attn @ v).transpose(1, 2).reshape(B, N, -1)
x = self.proj(x)
x = self.proj_drop(x)
return x
class AttentiveBlock(nn.Module):
r"""Attentive Block
Args:
dim (int): Number of input channels.
num_heads (int): Number of attention heads. Default: 8
qkv_bias (bool, optional): If True, add a learnable bias to q, k, v.
Default: False.
qk_scale (float | None, optional): Override default qk scale of
head_dim ** -0.5 if set. Default: None.
drop (float, optional): Dropout rate. Default: 0.0.
attn_drop (float, optional): Attention dropout rate. Default: 0.0.
drop_path (float | tuple[float], optional): Stochastic depth rate.
Default: 0.0.
norm_layer (nn.Module, optional): Normalization layer. Default: nn.LayerNorm.
attn_head_dim (int, optional): Dimension of attention head. Default: None.
out_dim (int, optional): Dimension of output. Default: None.
"""
def __init__(self,
dim,
num_heads,
qkv_bias=False,
qk_scale=None,
drop=0.,
attn_drop=0.,
drop_path=0.,
norm_layer="LN",
attn_head_dim=None,
out_dim=None):
super().__init__()
self.norm1_q = build_norm_layer(dim, norm_layer, eps=1e-6)
self.norm1_k = build_norm_layer(dim, norm_layer, eps=1e-6)
self.norm1_v = build_norm_layer(dim, norm_layer, eps=1e-6)
self.cross_dcn = CrossAttention(dim,
num_heads=num_heads,
qkv_bias=qkv_bias,
qk_scale=qk_scale,
attn_drop=attn_drop,
proj_drop=drop,
attn_head_dim=attn_head_dim,
out_dim=out_dim)
self.drop_path = DropPath(
drop_path) if drop_path > 0. else nn.Identity()
def forward(self,
x_q,
x_kv,
pos_q,
pos_k,
bool_masked_pos,
rel_pos_bias=None):
x_q = self.norm1_q(x_q + pos_q)
x_k = self.norm1_k(x_kv + pos_k)
x_v = self.norm1_v(x_kv)
x = self.cross_dcn(x_q, k=x_k, v=x_v)
return x
class AttentionPoolingBlock(AttentiveBlock):
def forward(self, x):
x_q = x.mean(1, keepdim=True)
x_kv = x
pos_q, pos_k = 0, 0
x = super().forward(x_q, x_kv, pos_q, pos_k,
bool_masked_pos=None,
rel_pos_bias=None)
x = x.squeeze(1)
return x
class StemLayer(nn.Module):
r""" Stem layer of InternImage
Args:
in_chans (int): number of input channels
out_chans (int): number of output channels
act_layer (str): activation layer
norm_layer (str): normalization layer
"""
def __init__(self,
in_chans=3,
out_chans=96,
act_layer='GELU',
norm_layer='BN'):
super().__init__()
self.conv1 = nn.Conv2d(in_chans,
out_chans // 2,
kernel_size=3,
stride=2,
padding=1)
self.norm1 = build_norm_layer(out_chans // 2, norm_layer,
'channels_first', 'channels_first')
self.act = build_act_layer(act_layer)
self.conv2 = nn.Conv2d(out_chans // 2,
out_chans,
kernel_size=3,
stride=2,
padding=1)
self.norm2 = build_norm_layer(out_chans, norm_layer, 'channels_first',
'channels_last')
def forward(self, x):
x = self.conv1(x)
x = self.norm1(x)
x = self.act(x)
x = self.conv2(x)
x = self.norm2(x)
return x
class DownsampleLayer(nn.Module):
r""" Downsample layer of InternImage
Args:
channels (int): number of input channels
norm_layer (str): normalization layer
"""
def __init__(self, channels, norm_layer='LN'):
super().__init__()
self.conv = nn.Conv2d(channels,
2 * channels,
kernel_size=3,
stride=2,
padding=1,
bias=False)
self.norm = build_norm_layer(2 * channels, norm_layer,
'channels_first', 'channels_last')
def forward(self, x):
x = self.conv(x.permute(0, 3, 1, 2))
x = self.norm(x)
return x
class MLPLayer(nn.Module):
r""" MLP layer of InternImage
Args:
in_features (int): number of input features
hidden_features (int): number of hidden features
out_features (int): number of output features
act_layer (str): activation layer
drop (float): dropout rate
"""
def __init__(self,
in_features,
hidden_features=None,
out_features=None,
act_layer='GELU',
drop=0.):
super().__init__()
out_features = out_features or in_features
hidden_features = hidden_features or in_features
self.fc1 = nn.Linear(in_features, hidden_features)
self.act = build_act_layer(act_layer)
self.fc2 = nn.Linear(hidden_features, out_features)
self.drop = nn.Dropout(drop)
def forward(self, x):
x = self.fc1(x)
x = self.act(x)
x = self.drop(x)
x = self.fc2(x)
x = self.drop(x)
return x
class InternImageLayer(nn.Module):
r""" Basic layer of InternImage
Args:
core_op (nn.Module): core operation of InternImage
channels (int): number of input channels
groups (list): Groups of each block.
mlp_ratio (float): ratio of mlp hidden features to input channels
drop (float): dropout rate
drop_path (float): drop path rate
act_layer (str): activation layer
norm_layer (str): normalization layer
post_norm (bool): whether to use post normalization
layer_scale (float): layer scale
offset_scale (float): offset scale
with_cp (bool): whether to use checkpoint
"""
def __init__(self,
core_op,
channels,
groups,
mlp_ratio=4.,
drop=0.,
drop_path=0.,
act_layer='GELU',
norm_layer='LN',
post_norm=False,
layer_scale=None,
offset_scale=1.0,
with_cp=False,
dw_kernel_size=None, # for InternImage-H/G
res_post_norm=False, # for InternImage-H/G
center_feature_scale=False): # for InternImage-H/G
super().__init__()
self.channels = channels
self.groups = groups
self.mlp_ratio = mlp_ratio
self.with_cp = with_cp
self.norm1 = build_norm_layer(channels, 'LN')
self.post_norm = post_norm
self.dcn = core_op(
channels=channels,
kernel_size=3,
stride=1,
pad=1,
dilation=1,
group=groups,
offset_scale=offset_scale,
act_layer=act_layer,
norm_layer=norm_layer,
dw_kernel_size=dw_kernel_size, # for InternImage-H/G
center_feature_scale=center_feature_scale) # for InternImage-H/G
self.drop_path = DropPath(drop_path) if drop_path > 0. \
else nn.Identity()
self.norm2 = build_norm_layer(channels, 'LN')
self.mlp = MLPLayer(in_features=channels,
hidden_features=int(channels * mlp_ratio),
act_layer=act_layer,
drop=drop)
self.layer_scale = layer_scale is not None
if self.layer_scale:
self.gamma1 = nn.Parameter(layer_scale * torch.ones(channels),
requires_grad=True)
self.gamma2 = nn.Parameter(layer_scale * torch.ones(channels),
requires_grad=True)
self.res_post_norm = res_post_norm
if res_post_norm:
self.res_post_norm1 = build_norm_layer(channels, 'LN')
self.res_post_norm2 = build_norm_layer(channels, 'LN')
def forward(self, x):
def _inner_forward(x):
if not self.layer_scale:
if self.post_norm:
x = x + self.drop_path(self.norm1(self.dcn(x)))
x = x + self.drop_path(self.norm2(self.mlp(x)))
elif self.res_post_norm: # for InternImage-H/G
x = x + self.drop_path(self.res_post_norm1(self.dcn(self.norm1(x))))
x = x + self.drop_path(self.res_post_norm2(self.mlp(self.norm2(x))))
else:
x = x + self.drop_path(self.dcn(self.norm1(x)))
x = x + self.drop_path(self.mlp(self.norm2(x)))
return x
if self.post_norm:
x = x + self.drop_path(self.gamma1 * self.norm1(self.dcn(x)))
x = x + self.drop_path(self.gamma2 * self.norm2(self.mlp(x)))
else:
x = x + self.drop_path(self.gamma1 * self.dcn(self.norm1(x)))
x = x + self.drop_path(self.gamma2 * self.mlp(self.norm2(x)))
return x
if self.with_cp and x.requires_grad:
x = checkpoint.checkpoint(_inner_forward, x)
else:
x = _inner_forward(x)
return x
class InternImageBlock(nn.Module):
r""" Block of InternImage
Args:
core_op (nn.Module): core operation of InternImage
channels (int): number of input channels
depths (list): Depth of each block.
groups (list): Groups of each block.
mlp_ratio (float): ratio of mlp hidden features to input channels
drop (float): dropout rate
drop_path (float): drop path rate
act_layer (str): activation layer
norm_layer (str): normalization layer
post_norm (bool): whether to use post normalization
layer_scale (float): layer scale
offset_scale (float): offset scale
with_cp (bool): whether to use checkpoint
"""
def __init__(self,
core_op,
channels,
depth,
groups,
downsample=True,
mlp_ratio=4.,
drop=0.,
drop_path=0.,
act_layer='GELU',
norm_layer='LN',
post_norm=False,
offset_scale=1.0,
layer_scale=None,
with_cp=False,
dw_kernel_size=None, # for InternImage-H/G
post_norm_block_ids=None, # for InternImage-H/G
res_post_norm=False, # for InternImage-H/G
center_feature_scale=False): # for InternImage-H/G
super().__init__()
self.channels = channels
self.depth = depth
self.post_norm = post_norm
self.center_feature_scale = center_feature_scale
self.blocks = nn.ModuleList([
InternImageLayer(
core_op=core_op,
channels=channels,
groups=groups,
mlp_ratio=mlp_ratio,
drop=drop,
drop_path=drop_path[i] if isinstance(
drop_path, list) else drop_path,
act_layer=act_layer,
norm_layer=norm_layer,
post_norm=post_norm,
layer_scale=layer_scale,
offset_scale=offset_scale,
with_cp=with_cp,
dw_kernel_size=dw_kernel_size, # for InternImage-H/G
res_post_norm=res_post_norm, # for InternImage-H/G
center_feature_scale=center_feature_scale # for InternImage-H/G
) for i in range(depth)
])
if not self.post_norm or center_feature_scale:
self.norm = build_norm_layer(channels, 'LN')
self.post_norm_block_ids = post_norm_block_ids
if post_norm_block_ids is not None: # for InternImage-H/G
self.post_norms = nn.ModuleList(
[build_norm_layer(channels, 'LN', eps=1e-6) for _ in post_norm_block_ids]
)
self.downsample = DownsampleLayer(
channels=channels, norm_layer=norm_layer) if downsample else None
def forward(self, x, return_wo_downsample=False):
for i, blk in enumerate(self.blocks):
x = blk(x)
if (self.post_norm_block_ids is not None) and (i in self.post_norm_block_ids):
index = self.post_norm_block_ids.index(i)
x = self.post_norms[index](x) # for InternImage-H/G
if not self.post_norm or self.center_feature_scale:
x = self.norm(x)
if return_wo_downsample:
x_ = x
if self.downsample is not None:
x = self.downsample(x)
if return_wo_downsample:
return x, x_
return x
@BACKBONES.register_module()
class InternImage(nn.Module):
r""" InternImage
A PyTorch impl of : `InternImage: Exploring Large-Scale Vision Foundation Models with Deformable Convolutions` -
https://arxiv.org/pdf/2103.14030
Args:
core_op (str): Core operator. Default: 'DCNv3'
channels (int): Number of the first stage. Default: 64
depths (list): Depth of each block. Default: [3, 4, 18, 5]
groups (list): Groups of each block. Default: [3, 6, 12, 24]
mlp_ratio (float): Ratio of mlp hidden dim to embedding dim. Default: 4.
drop_rate (float): Probability of an element to be zeroed. Default: 0.
drop_path_rate (float): Stochastic depth rate. Default: 0.
act_layer (str): Activation layer. Default: 'GELU'
norm_layer (str): Normalization layer. Default: 'LN'
layer_scale (bool): Whether to use layer scale. Default: False
cls_scale (bool): Whether to use class scale. Default: False
with_cp (bool): Use checkpoint or not. Using checkpoint will save some
dw_kernel_size (int): Size of the dwconv. Default: None
level2_post_norm (bool): Whether to use level2 post norm. Default: False
level2_post_norm_block_ids (list): Indexes of post norm blocks. Default: None
res_post_norm (bool): Whether to use res post norm. Default: False
center_feature_scale (bool): Whether to use center feature scale. Default: False
"""
def __init__(self,
core_op='DCNv3',
channels=64,
depths=[3, 4, 18, 5],
groups=[3, 6, 12, 24],
mlp_ratio=4.,
drop_rate=0.,
drop_path_rate=0.2,
drop_path_type='linear',
act_layer='GELU',
norm_layer='LN',
layer_scale=None,
offset_scale=1.0,
post_norm=False,
with_cp=False,
dw_kernel_size=None, # for InternImage-H/G
level2_post_norm=False, # for InternImage-H/G
level2_post_norm_block_ids=None, # for InternImage-H/G
res_post_norm=False, # for InternImage-H/G
center_feature_scale=False, # for InternImage-H/G
out_indices=(0, 1, 2, 3),
init_cfg=None,
**kwargs):
super().__init__()
self.core_op = core_op
self.num_levels = len(depths)
self.depths = depths
self.channels = channels
self.num_features = int(channels * 2**(self.num_levels - 1))
self.post_norm = post_norm
self.mlp_ratio = mlp_ratio
self.init_cfg = init_cfg
self.out_indices = out_indices
self.level2_post_norm_block_ids = level2_post_norm_block_ids
logger = get_root_logger()
logger.info(f'using core type: {core_op}')
logger.info(f'using activation layer: {act_layer}')
logger.info(f'using main norm layer: {norm_layer}')
logger.info(f'using dpr: {drop_path_type}, {drop_path_rate}')
logger.info(f"level2_post_norm: {level2_post_norm}")
logger.info(f"level2_post_norm_block_ids: {level2_post_norm_block_ids}")
logger.info(f"res_post_norm: {res_post_norm}")
in_chans = 3
self.patch_embed = StemLayer(in_chans=in_chans,
out_chans=channels,
act_layer=act_layer,
norm_layer=norm_layer)
self.pos_drop = nn.Dropout(p=drop_rate)
dpr = [
x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))
]
if drop_path_type == 'uniform':
for i in range(len(dpr)):
dpr[i] = drop_path_rate
self.levels = nn.ModuleList()
for i in range(self.num_levels):
post_norm_block_ids = level2_post_norm_block_ids if level2_post_norm and (
i == 2) else None # for InternImage-H/G
level = InternImageBlock(
core_op=getattr(opsm, core_op),
channels=int(channels * 2**i),
depth=depths[i],
groups=groups[i],
mlp_ratio=self.mlp_ratio,
drop=drop_rate,
drop_path=dpr[sum(depths[:i]):sum(depths[:i + 1])],
act_layer=act_layer,
norm_layer=norm_layer,
post_norm=post_norm,
downsample=(i < self.num_levels - 1),
layer_scale=layer_scale,
offset_scale=offset_scale,
with_cp=with_cp,
dw_kernel_size=dw_kernel_size, # for InternImage-H/G
post_norm_block_ids=post_norm_block_ids, # for InternImage-H/G
res_post_norm=res_post_norm, # for InternImage-H/G
center_feature_scale=center_feature_scale # for InternImage-H/G
)
self.levels.append(level)
self.num_layers = len(depths)
self.apply(self._init_weights)
self.apply(self._init_deform_weights)
def init_weights(self):
logger = get_root_logger()
if self.init_cfg is None:
logger.warn(f'No pre-trained weights for '
f'{self.__class__.__name__}, '
f'training start from scratch')
for m in self.modules():
if isinstance(m, nn.Linear):
trunc_normal_init(m, std=.02, bias=0.)
elif isinstance(m, nn.LayerNorm):
constant_init(m, 1.0)
else:
assert 'checkpoint' in self.init_cfg, f'Only support ' \
f'specify `Pretrained` in ' \
f'`init_cfg` in ' \
f'{self.__class__.__name__} '
ckpt = _load_checkpoint(self.init_cfg.checkpoint,
logger=logger,
map_location='cpu')
if 'state_dict' in ckpt:
_state_dict = ckpt['state_dict']
elif 'model' in ckpt:
_state_dict = ckpt['model']
else:
_state_dict = ckpt
state_dict = OrderedDict()
for k, v in _state_dict.items():
if k.startswith('backbone.'):
state_dict[k[9:]] = v
else:
state_dict[k] = v
# strip prefix of state_dict
if list(state_dict.keys())[0].startswith('module.'):
state_dict = {k[7:]: v for k, v in state_dict.items()}
# load state_dict
meg = self.load_state_dict(state_dict, False)
logger.info(meg)
def _init_weights(self, m):
if isinstance(m, nn.Linear):
trunc_normal_(m.weight, std=.02)
if isinstance(m, nn.Linear) and m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.bias, 0)
nn.init.constant_(m.weight, 1.0)
def _init_deform_weights(self, m):
if isinstance(m, getattr(opsm, self.core_op)):
m._reset_parameters()
def forward(self, x):
x = self.patch_embed(x)
x = self.pos_drop(x)
seq_out = []
for level_idx, level in enumerate(self.levels):
x, x_ = level(x, return_wo_downsample=True)
if level_idx in self.out_indices:
seq_out.append(x_.permute(0, 3, 1, 2).contiguous())
return seq_out
# --------------------------------------------------------
# InternImage
# Copyright (c) 2022 OpenGVLab
# Licensed under The MIT License [see LICENSE for details]
# --------------------------------------------------------
from .dcnv3_func import DCNv3Function, dcnv3_core_pytorch
# --------------------------------------------------------
# InternImage
# Copyright (c) 2022 OpenGVLab
# Licensed under The MIT License [see LICENSE for details]
# --------------------------------------------------------
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
import torch
import torch.nn.functional as F
from torch.autograd import Function
from torch.autograd.function import once_differentiable
from torch.cuda.amp import custom_bwd, custom_fwd
import DCNv3
class DCNv3Function(Function):
@staticmethod
@custom_fwd
def forward(
ctx, input, offset, mask,
kernel_h, kernel_w, stride_h, stride_w,
pad_h, pad_w, dilation_h, dilation_w,
group, group_channels, offset_scale, im2col_step):
ctx.kernel_h = kernel_h
ctx.kernel_w = kernel_w
ctx.stride_h = stride_h
ctx.stride_w = stride_w
ctx.pad_h = pad_h
ctx.pad_w = pad_w
ctx.dilation_h = dilation_h
ctx.dilation_w = dilation_w
ctx.group = group
ctx.group_channels = group_channels
ctx.offset_scale = offset_scale
ctx.im2col_step = im2col_step
output = DCNv3.dcnv3_forward(
input, offset, mask, kernel_h,
kernel_w, stride_h, stride_w, pad_h,
pad_w, dilation_h, dilation_w, group,
group_channels, offset_scale, ctx.im2col_step)
ctx.save_for_backward(input, offset, mask)
return output
@staticmethod
@once_differentiable
@custom_bwd
def backward(ctx, grad_output):
input, offset, mask = ctx.saved_tensors
grad_input, grad_offset, grad_mask = \
DCNv3.dcnv3_backward(
input, offset, mask, ctx.kernel_h,
ctx.kernel_w, ctx.stride_h, ctx.stride_w, ctx.pad_h,
ctx.pad_w, ctx.dilation_h, ctx.dilation_w, ctx.group,
ctx.group_channels, ctx.offset_scale, grad_output.contiguous(), ctx.im2col_step)
return grad_input, grad_offset, grad_mask, \
None, None, None, None, None, None, None, None, None, None, None, None
@staticmethod
def symbolic(g, input, offset, mask, kernel_h, kernel_w, stride_h,
stride_w, pad_h, pad_w, dilation_h, dilation_w, group,
group_channels, offset_scale, im2col_step):
"""Symbolic function for mmdeploy::DCNv3.
Returns:
DCNv3 op for onnx.
"""
return g.op(
'mmdeploy::TRTDCNv3',
input,
offset,
mask,
kernel_h_i=int(kernel_h),
kernel_w_i=int(kernel_w),
stride_h_i=int(stride_h),
stride_w_i=int(stride_w),
pad_h_i=int(pad_h),
pad_w_i=int(pad_w),
dilation_h_i=int(dilation_h),
dilation_w_i=int(dilation_w),
group_i=int(group),
group_channels_i=int(group_channels),
offset_scale_f=float(offset_scale),
im2col_step_i=int(im2col_step),
)
def _get_reference_points(spatial_shapes, device, kernel_h, kernel_w, dilation_h, dilation_w, pad_h=0, pad_w=0, stride_h=1, stride_w=1):
_, H_, W_, _ = spatial_shapes
H_out = (H_ - (dilation_h * (kernel_h - 1) + 1)) // stride_h + 1
W_out = (W_ - (dilation_w * (kernel_w - 1) + 1)) // stride_w + 1
ref_y, ref_x = torch.meshgrid(
torch.linspace(
# pad_h + 0.5,
# H_ - pad_h - 0.5,
(dilation_h * (kernel_h - 1)) // 2 + 0.5,
(dilation_h * (kernel_h - 1)) // 2 + 0.5 + (H_out - 1) * stride_h,
H_out,
dtype=torch.float32,
device=device),
torch.linspace(
# pad_w + 0.5,
# W_ - pad_w - 0.5,
(dilation_w * (kernel_w - 1)) // 2 + 0.5,
(dilation_w * (kernel_w - 1)) // 2 + 0.5 + (W_out - 1) * stride_w,
W_out,
dtype=torch.float32,
device=device))
ref_y = ref_y.reshape(-1)[None] / H_
ref_x = ref_x.reshape(-1)[None] / W_
ref = torch.stack((ref_x, ref_y), -1).reshape(
1, H_out, W_out, 1, 2)
return ref
def _generate_dilation_grids(spatial_shapes, kernel_h, kernel_w, dilation_h, dilation_w, group, device):
_, H_, W_, _ = spatial_shapes
points_list = []
x, y = torch.meshgrid(
torch.linspace(
-((dilation_w * (kernel_w - 1)) // 2),
-((dilation_w * (kernel_w - 1)) // 2) +
(kernel_w - 1) * dilation_w, kernel_w,
dtype=torch.float32,
device=device),
torch.linspace(
-((dilation_h * (kernel_h - 1)) // 2),
-((dilation_h * (kernel_h - 1)) // 2) +
(kernel_h - 1) * dilation_h, kernel_h,
dtype=torch.float32,
device=device))
points_list.extend([x / W_, y / H_])
grid = torch.stack(points_list, -1).reshape(-1, 1, 2).\
repeat(1, group, 1).permute(1, 0, 2)
grid = grid.reshape(1, 1, 1, group * kernel_h * kernel_w, 2)
return grid
def dcnv3_core_pytorch(
input, offset, mask, kernel_h,
kernel_w, stride_h, stride_w, pad_h,
pad_w, dilation_h, dilation_w, group,
group_channels, offset_scale):
# for debug and test only,
# need to use cuda version instead
input = F.pad(
input,
[0, 0, pad_h, pad_h, pad_w, pad_w])
N_, H_in, W_in, _ = input.shape
_, H_out, W_out, _ = offset.shape
ref = _get_reference_points(
input.shape, input.device, kernel_h, kernel_w, dilation_h, dilation_w, pad_h, pad_w, stride_h, stride_w)
grid = _generate_dilation_grids(
input.shape, kernel_h, kernel_w, dilation_h, dilation_w, group, input.device)
spatial_norm = torch.tensor([W_in, H_in]).reshape(1, 1, 1, 2).\
repeat(1, 1, 1, group*kernel_h*kernel_w).to(input.device)
sampling_locations = (ref + grid * offset_scale).repeat(N_, 1, 1, 1, 1).flatten(3, 4) + \
offset * offset_scale / spatial_norm
P_ = kernel_h * kernel_w
sampling_grids = 2 * sampling_locations - 1
# N_, H_in, W_in, group*group_channels -> N_, H_in*W_in, group*group_channels -> N_, group*group_channels, H_in*W_in -> N_*group, group_channels, H_in, W_in
input_ = input.view(N_, H_in*W_in, group*group_channels).transpose(1, 2).\
reshape(N_*group, group_channels, H_in, W_in)
# N_, H_out, W_out, group*P_*2 -> N_, H_out*W_out, group, P_, 2 -> N_, group, H_out*W_out, P_, 2 -> N_*group, H_out*W_out, P_, 2
sampling_grid_ = sampling_grids.view(N_, H_out*W_out, group, P_, 2).transpose(1, 2).\
flatten(0, 1)
# N_*group, group_channels, H_out*W_out, P_
sampling_input_ = F.grid_sample(
input_, sampling_grid_, mode='bilinear', padding_mode='zeros', align_corners=False)
# (N_, H_out, W_out, group*P_) -> N_, H_out*W_out, group, P_ -> (N_, group, H_out*W_out, P_) -> (N_*group, 1, H_out*W_out, P_)
mask = mask.view(N_, H_out*W_out, group, P_).transpose(1, 2).\
reshape(N_*group, 1, H_out*W_out, P_)
output = (sampling_input_ * mask).sum(-1).view(N_,
group*group_channels, H_out*W_out)
return output.transpose(1, 2).reshape(N_, H_out, W_out, -1).contiguous()
#!/usr/bin/env bash
# --------------------------------------------------------
# InternImage
# Copyright (c) 2022 OpenGVLab
# Licensed under The MIT License [see LICENSE for details]
# --------------------------------------------------------
python setup.py build install
# --------------------------------------------------------
# InternImage
# Copyright (c) 2022 OpenGVLab
# Licensed under The MIT License [see LICENSE for details]
# --------------------------------------------------------
from .dcnv3 import DCNv3, DCNv3_pytorch
\ No newline at end of file
# --------------------------------------------------------
# InternImage
# Copyright (c) 2022 OpenGVLab
# Licensed under The MIT License [see LICENSE for details]
# --------------------------------------------------------
from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
import warnings
import torch
from torch import nn
import torch.nn.functional as F
from torch.nn.init import xavier_uniform_, constant_
from ..functions import DCNv3Function, dcnv3_core_pytorch
class to_channels_first(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
return x.permute(0, 3, 1, 2)
class to_channels_last(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
return x.permute(0, 2, 3, 1)
def build_norm_layer(dim,
norm_layer,
in_format='channels_last',
out_format='channels_last',
eps=1e-6):
layers = []
if norm_layer == 'BN':
if in_format == 'channels_last':
layers.append(to_channels_first())
layers.append(nn.BatchNorm2d(dim))
if out_format == 'channels_last':
layers.append(to_channels_last())
elif norm_layer == 'LN':
if in_format == 'channels_first':
layers.append(to_channels_last())
layers.append(nn.LayerNorm(dim, eps=eps))
if out_format == 'channels_first':
layers.append(to_channels_first())
else:
raise NotImplementedError(
f'build_norm_layer does not support {norm_layer}')
return nn.Sequential(*layers)
def build_act_layer(act_layer):
if act_layer == 'ReLU':
return nn.ReLU(inplace=True)
elif act_layer == 'SiLU':
return nn.SiLU(inplace=True)
elif act_layer == 'GELU':
return nn.GELU()
raise NotImplementedError(f'build_act_layer does not support {act_layer}')
def _is_power_of_2(n):
if (not isinstance(n, int)) or (n < 0):
raise ValueError(
"invalid input for _is_power_of_2: {} (type: {})".format(n, type(n)))
return (n & (n - 1) == 0) and n != 0
class CenterFeatureScaleModule(nn.Module):
def forward(self,
query,
center_feature_scale_proj_weight,
center_feature_scale_proj_bias):
center_feature_scale = F.linear(query,
weight=center_feature_scale_proj_weight,
bias=center_feature_scale_proj_bias).sigmoid()
return center_feature_scale
class DCNv3_pytorch(nn.Module):
def __init__(
self,
channels=64,
kernel_size=3,
dw_kernel_size=None,
stride=1,
pad=1,
dilation=1,
group=4,
offset_scale=1.0,
act_layer='GELU',
norm_layer='LN',
center_feature_scale=False):
"""
DCNv3 Module
:param channels
:param kernel_size
:param stride
:param pad
:param dilation
:param group
:param offset_scale
:param act_layer
:param norm_layer
"""
super().__init__()
if channels % group != 0:
raise ValueError(
f'channels must be divisible by group, but got {channels} and {group}')
_d_per_group = channels // group
dw_kernel_size = dw_kernel_size if dw_kernel_size is not None else kernel_size
# you'd better set _d_per_group to a power of 2 which is more efficient in our CUDA implementation
if not _is_power_of_2(_d_per_group):
warnings.warn(
"You'd better set channels in DCNv3 to make the dimension of each attention head a power of 2 "
"which is more efficient in our CUDA implementation.")
self.offset_scale = offset_scale
self.channels = channels
self.kernel_size = kernel_size
self.dw_kernel_size = dw_kernel_size
self.stride = stride
self.dilation = dilation
self.pad = pad
self.group = group
self.group_channels = channels // group
self.offset_scale = offset_scale
self.center_feature_scale = center_feature_scale
self.dw_conv = nn.Sequential(
nn.Conv2d(
channels,
channels,
kernel_size=dw_kernel_size,
stride=1,
padding=(dw_kernel_size - 1) // 2,
groups=channels),
build_norm_layer(
channels,
norm_layer,
'channels_first',
'channels_last'),
build_act_layer(act_layer))
self.offset = nn.Linear(
channels,
group * kernel_size * kernel_size * 2)
self.mask = nn.Linear(
channels,
group * kernel_size * kernel_size)
self.input_proj = nn.Linear(channels, channels)
self.output_proj = nn.Linear(channels, channels)
self._reset_parameters()
if center_feature_scale:
self.center_feature_scale_proj_weight = nn.Parameter(
torch.zeros((group, channels), dtype=torch.float))
self.center_feature_scale_proj_bias = nn.Parameter(
torch.tensor(0.0, dtype=torch.float).view((1,)).repeat(group, ))
self.center_feature_scale_module = CenterFeatureScaleModule()
def _reset_parameters(self):
constant_(self.offset.weight.data, 0.)
constant_(self.offset.bias.data, 0.)
constant_(self.mask.weight.data, 0.)
constant_(self.mask.bias.data, 0.)
xavier_uniform_(self.input_proj.weight.data)
constant_(self.input_proj.bias.data, 0.)
xavier_uniform_(self.output_proj.weight.data)
constant_(self.output_proj.bias.data, 0.)
def forward(self, input):
"""
:param query (N, H, W, C)
:return output (N, H, W, C)
"""
N, H, W, _ = input.shape
x = self.input_proj(input)
x_proj = x
x1 = input.permute(0, 3, 1, 2)
x1 = self.dw_conv(x1)
offset = self.offset(x1)
mask = self.mask(x1).reshape(N, H, W, self.group, -1)
mask = F.softmax(mask, -1).reshape(N, H, W, -1)
x = dcnv3_core_pytorch(
x, offset, mask,
self.kernel_size, self.kernel_size,
self.stride, self.stride,
self.pad, self.pad,
self.dilation, self.dilation,
self.group, self.group_channels,
self.offset_scale)
if self.center_feature_scale:
center_feature_scale = self.center_feature_scale_module(
x1, self.center_feature_scale_proj_weight, self.center_feature_scale_proj_bias)
# N, H, W, groups -> N, H, W, groups, 1 -> N, H, W, groups, _d_per_group -> N, H, W, channels
center_feature_scale = center_feature_scale[..., None].repeat(
1, 1, 1, 1, self.channels // self.group).flatten(-2)
x = x * (1 - center_feature_scale) + x_proj * center_feature_scale
x = self.output_proj(x)
return x
class DCNv3(nn.Module):
def __init__(
self,
channels=64,
kernel_size=3,
dw_kernel_size=None,
stride=1,
pad=1,
dilation=1,
group=4,
offset_scale=1.0,
act_layer='GELU',
norm_layer='LN',
center_feature_scale=False):
"""
DCNv3 Module
:param channels
:param kernel_size
:param stride
:param pad
:param dilation
:param group
:param offset_scale
:param act_layer
:param norm_layer
"""
super().__init__()
if channels % group != 0:
raise ValueError(
f'channels must be divisible by group, but got {channels} and {group}')
_d_per_group = channels // group
dw_kernel_size = dw_kernel_size if dw_kernel_size is not None else kernel_size
# you'd better set _d_per_group to a power of 2 which is more efficient in our CUDA implementation
if not _is_power_of_2(_d_per_group):
warnings.warn(
"You'd better set channels in DCNv3 to make the dimension of each attention head a power of 2 "
"which is more efficient in our CUDA implementation.")
self.offset_scale = offset_scale
self.channels = channels
self.kernel_size = kernel_size
self.dw_kernel_size = dw_kernel_size
self.stride = stride
self.dilation = dilation
self.pad = pad
self.group = group
self.group_channels = channels // group
self.offset_scale = offset_scale
self.center_feature_scale = center_feature_scale
self.dw_conv = nn.Sequential(
nn.Conv2d(
channels,
channels,
kernel_size=dw_kernel_size,
stride=1,
padding=(dw_kernel_size - 1) // 2,
groups=channels),
build_norm_layer(
channels,
norm_layer,
'channels_first',
'channels_last'),
build_act_layer(act_layer))
self.offset = nn.Linear(
channels,
group * kernel_size * kernel_size * 2)
self.mask = nn.Linear(
channels,
group * kernel_size * kernel_size)
self.input_proj = nn.Linear(channels, channels)
self.output_proj = nn.Linear(channels, channels)
self._reset_parameters()
if center_feature_scale:
self.center_feature_scale_proj_weight = nn.Parameter(
torch.zeros((group, channels), dtype=torch.float))
self.center_feature_scale_proj_bias = nn.Parameter(
torch.tensor(0.0, dtype=torch.float).view((1,)).repeat(group, ))
self.center_feature_scale_module = CenterFeatureScaleModule()
def _reset_parameters(self):
constant_(self.offset.weight.data, 0.)
constant_(self.offset.bias.data, 0.)
constant_(self.mask.weight.data, 0.)
constant_(self.mask.bias.data, 0.)
xavier_uniform_(self.input_proj.weight.data)
constant_(self.input_proj.bias.data, 0.)
xavier_uniform_(self.output_proj.weight.data)
constant_(self.output_proj.bias.data, 0.)
def forward(self, input):
"""
:param query (N, H, W, C)
:return output (N, H, W, C)
"""
N, H, W, _ = input.shape
x = self.input_proj(input)
x_proj = x
dtype = x.dtype
x1 = input.permute(0, 3, 1, 2)
x1 = self.dw_conv(x1)
offset = self.offset(x1)
mask = self.mask(x1).reshape(N, H, W, self.group, -1)
mask = F.softmax(mask, -1).reshape(N, H, W, -1).type(dtype)
x = DCNv3Function.apply(
x, offset, mask,
self.kernel_size, self.kernel_size,
self.stride, self.stride,
self.pad, self.pad,
self.dilation, self.dilation,
self.group, self.group_channels,
self.offset_scale,
256)
if self.center_feature_scale:
center_feature_scale = self.center_feature_scale_module(
x1, self.center_feature_scale_proj_weight, self.center_feature_scale_proj_bias)
# N, H, W, groups -> N, H, W, groups, 1 -> N, H, W, groups, _d_per_group -> N, H, W, channels
center_feature_scale = center_feature_scale[..., None].repeat(
1, 1, 1, 1, self.channels // self.group).flatten(-2)
x = x * (1 - center_feature_scale) + x_proj * center_feature_scale
x = self.output_proj(x)
return x
# --------------------------------------------------------
# InternImage
# Copyright (c) 2022 OpenGVLab
# Licensed under The MIT License [see LICENSE for details]
# --------------------------------------------------------
import os
import glob
import torch
from torch.utils.cpp_extension import CUDA_HOME
from torch.utils.cpp_extension import CppExtension
from torch.utils.cpp_extension import CUDAExtension
from setuptools import find_packages
from setuptools import setup
requirements = ["torch", "torchvision"]
def get_extensions():
this_dir = os.path.dirname(os.path.abspath(__file__))
extensions_dir = os.path.join(this_dir, "src")
main_file = glob.glob(os.path.join(extensions_dir, "*.cpp"))
source_cpu = glob.glob(os.path.join(extensions_dir, "cpu", "*.cpp"))
source_cuda = glob.glob(os.path.join(extensions_dir, "cuda", "*.cu"))
sources = main_file + source_cpu
extension = CppExtension
extra_compile_args = {"cxx": []}
define_macros = []
if torch.cuda.is_available() and CUDA_HOME is not None:
extension = CUDAExtension
sources += source_cuda
define_macros += [("WITH_CUDA", None)]
extra_compile_args["nvcc"] = [
# "-DCUDA_HAS_FP16=1",
# "-D__CUDA_NO_HALF_OPERATORS__",
# "-D__CUDA_NO_HALF_CONVERSIONS__",
# "-D__CUDA_NO_HALF2_OPERATORS__",
]
else:
raise NotImplementedError('Cuda is not availabel')
sources = [os.path.join(extensions_dir, s) for s in sources]
include_dirs = [extensions_dir]
ext_modules = [
extension(
"DCNv3",
sources,
include_dirs=include_dirs,
define_macros=define_macros,
extra_compile_args=extra_compile_args,
)
]
return ext_modules
setup(
name="DCNv3",
version="1.0",
author="InternImage",
url="https://github.com/OpenGVLab/InternImage",
description=
"PyTorch Wrapper for CUDA Functions of DCNv3",
packages=find_packages(exclude=(
"configs",
"tests",
)),
ext_modules=get_extensions(),
cmdclass={"build_ext": torch.utils.cpp_extension.BuildExtension},
)
/*!
**************************************************************************************************
* InternImage
* Copyright (c) 2022 OpenGVLab
* Licensed under The MIT License [see LICENSE for details]
**************************************************************************************************
* Modified from
*https://github.com/chengdazhi/Deformable-Convolution-V2-PyTorch/tree/pytorch_1.0.0
**************************************************************************************************
*/
#include <vector>
#include <ATen/ATen.h>
#include <ATen/cuda/CUDAContext.h>
at::Tensor dcnv3_cpu_forward(const at::Tensor &input, const at::Tensor &offset,
const at::Tensor &mask, const int kernel_h,
const int kernel_w, const int stride_h,
const int stride_w, const int pad_h,
const int pad_w, const int dilation_h,
const int dilation_w, const int group,
const int group_channels, const float offset_scale,
const int im2col_step) {
AT_ERROR("Not implement on cpu");
}
std::vector<at::Tensor>
dcnv3_cpu_backward(const at::Tensor &input, const at::Tensor &offset,
const at::Tensor &mask, const int kernel_h,
const int kernel_w, const int stride_h, const int stride_w,
const int pad_h, const int pad_w, const int dilation_h,
const int dilation_w, const int group,
const int group_channels, const float offset_scale,
const at::Tensor &grad_output, const int im2col_step) {
AT_ERROR("Not implement on cpu");
}
/*!
**************************************************************************************************
* InternImage
* Copyright (c) 2022 OpenGVLab
* Licensed under The MIT License [see LICENSE for details]
**************************************************************************************************
* Modified from
*https://github.com/chengdazhi/Deformable-Convolution-V2-PyTorch/tree/pytorch_1.0.0
**************************************************************************************************
*/
#pragma once
#include <torch/extension.h>
at::Tensor dcnv3_cpu_forward(const at::Tensor &input, const at::Tensor &offset,
const at::Tensor &mask, const int kernel_h,
const int kernel_w, const int stride_h,
const int stride_w, const int pad_h,
const int pad_w, const int dilation_h,
const int dilation_w, const int group,
const int group_channels, const float offset_scale,
const int im2col_step);
std::vector<at::Tensor>
dcnv3_cpu_backward(const at::Tensor &input, const at::Tensor &offset,
const at::Tensor &mask, const int kernel_h,
const int kernel_w, const int stride_h, const int stride_w,
const int pad_h, const int pad_w, const int dilation_h,
const int dilation_w, const int group,
const int group_channels, const float offset_scale,
const at::Tensor &grad_output, const int im2col_step);
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment