Commit 4cd43886 authored by lishj6's avatar lishj6 🏸
Browse files

init

parent a9a1fe81
# Copyright 2021 Toyota Research Institute. All rights reserved.
import torch
from detectron2.layers import cat
from projects.mmdet3d_plugin.dd3d.structures.boxes3d import Boxes3D
INF = 100000000.
class DD3DTargetPreparer():
def __init__(self,
num_classes,
input_shape,
box3d_on=True,
center_sample=True,
pos_radius=1.5,
sizes_of_interest=None):
self.num_classes = num_classes
self.center_sample = center_sample
self.strides = [shape.stride for shape in input_shape]
self.radius = pos_radius
self.dd3d_enabled = box3d_on
# generate sizes of interest
# NOTE:
# soi = []
# prev_size = -1
# for s in sizes_of_interest:
# soi.append([prev_size, s])
# prev_size = s
# soi.append([prev_size, INF])
self.sizes_of_interest = sizes_of_interest
def __call__(self, locations, gt_instances, feature_shapes):
num_loc_list = [len(loc) for loc in locations]
# compute locations to size ranges
loc_to_size_range = []
for l, loc_per_level in enumerate(locations):
loc_to_size_range_per_level = loc_per_level.new_tensor(self.sizes_of_interest[l])
loc_to_size_range.append(loc_to_size_range_per_level[None].expand(num_loc_list[l], -1))
loc_to_size_range = torch.cat(loc_to_size_range, dim=0)
locations = torch.cat(locations, dim=0)
training_targets = self.compute_targets_for_locations(locations, gt_instances, loc_to_size_range, num_loc_list)
training_targets["locations"] = [locations.clone() for _ in range(len(gt_instances))]
training_targets["im_inds"] = [
locations.new_ones(locations.size(0), dtype=torch.long) * i for i in range(len(gt_instances))
]
box2d = training_targets.pop("box2d", None)
# transpose im first training_targets to level first ones
training_targets = {k: self._transpose(v, num_loc_list) for k, v in training_targets.items() if k != "box2d"}
training_targets["fpn_levels"] = [
loc.new_ones(len(loc), dtype=torch.long) * level for level, loc in enumerate(training_targets["locations"])
]
# Flatten targets: (L x B x H x W, TARGET_SIZE)
labels = cat([x.reshape(-1) for x in training_targets["labels"]])
box2d_reg_targets = cat([x.reshape(-1, 4) for x in training_targets["box2d_reg"]])
target_inds = cat([x.reshape(-1) for x in training_targets["target_inds"]])
locations = cat([x.reshape(-1, 2) for x in training_targets["locations"]])
im_inds = cat([x.reshape(-1) for x in training_targets["im_inds"]])
fpn_levels = cat([x.reshape(-1) for x in training_targets["fpn_levels"]])
pos_inds = torch.nonzero(labels != self.num_classes).squeeze(1)
targets = {
"labels": labels,
"box2d_reg_targets": box2d_reg_targets,
"locations": locations,
"target_inds": target_inds,
"im_inds": im_inds,
"fpn_levels": fpn_levels,
"pos_inds": pos_inds
}
if self.dd3d_enabled:
box3d_targets = Boxes3D.cat(training_targets["box3d"])
targets.update({"box3d_targets": box3d_targets})
if box2d is not None:
# Original format is B x L x (H x W, 4)
# Need to be in L x (B, 4, H, W).
batched_box2d = []
for lvl, per_lvl_box2d in enumerate(zip(*box2d)):
# B x (H x W, 4)
h, w = feature_shapes[lvl]
batched_box2d_lvl = torch.stack([x.T.reshape(4, h, w) for x in per_lvl_box2d], dim=0)
batched_box2d.append(batched_box2d_lvl)
targets.update({"batched_box2d": batched_box2d})
return targets
def compute_targets_for_locations(self, locations, targets, size_ranges, num_loc_list):
labels = []
box2d_reg = []
if self.dd3d_enabled:
box3d = []
target_inds = []
xs, ys = locations[:, 0], locations[:, 1]
num_targets = 0
for im_i in range(len(targets)):
targets_per_im = targets[im_i]
bboxes = targets_per_im.gt_boxes.tensor
labels_per_im = targets_per_im.gt_classes
# no gt
if bboxes.numel() == 0:
labels.append(labels_per_im.new_zeros(locations.size(0)) + self.num_classes)
# reg_targets.append(locations.new_zeros((locations.size(0), 4)))
box2d_reg.append(locations.new_zeros((locations.size(0), 4)))
target_inds.append(labels_per_im.new_zeros(locations.size(0)) - 1)
if self.dd3d_enabled:
box3d.append(
Boxes3D(
locations.new_zeros(locations.size(0), 4),
locations.new_zeros(locations.size(0), 2),
locations.new_zeros(locations.size(0), 1),
locations.new_zeros(locations.size(0), 3),
locations.new_zeros(locations.size(0), 3, 3),
).to(torch.float32)
)
continue
area = targets_per_im.gt_boxes.area()
l = xs[:, None] - bboxes[:, 0][None]
t = ys[:, None] - bboxes[:, 1][None]
r = bboxes[:, 2][None] - xs[:, None]
b = bboxes[:, 3][None] - ys[:, None]
# reg_targets_per_im = torch.stack([l, t, r, b], dim=2)
box2d_reg_per_im = torch.stack([l, t, r, b], dim=2)
if self.center_sample:
is_in_boxes = self.get_sample_region(bboxes, num_loc_list, xs, ys)
else:
is_in_boxes = box2d_reg_per_im.min(dim=2)[0] > 0
max_reg_targets_per_im = box2d_reg_per_im.max(dim=2)[0]
# limit the regression range for each location
is_cared_in_the_level = \
(max_reg_targets_per_im >= size_ranges[:, [0]]) & \
(max_reg_targets_per_im <= size_ranges[:, [1]])
locations_to_gt_area = area[None].repeat(len(locations), 1)
locations_to_gt_area[is_in_boxes == 0] = INF
locations_to_gt_area[is_cared_in_the_level == 0] = INF
# if there are still more than one objects for a location,
# we choose the one with minimal area
locations_to_min_area, locations_to_gt_inds = locations_to_gt_area.min(dim=1)
box2d_reg_per_im = box2d_reg_per_im[range(len(locations)), locations_to_gt_inds]
target_inds_per_im = locations_to_gt_inds + num_targets
num_targets += len(targets_per_im)
labels_per_im = labels_per_im[locations_to_gt_inds]
labels_per_im[locations_to_min_area == INF] = self.num_classes
labels.append(labels_per_im)
box2d_reg.append(box2d_reg_per_im)
target_inds.append(target_inds_per_im)
if self.dd3d_enabled:
# 3D box targets
box3d_per_im = targets_per_im.gt_boxes3d[locations_to_gt_inds]
box3d.append(box3d_per_im)
ret = {"labels": labels, "box2d_reg": box2d_reg, "target_inds": target_inds}
if self.dd3d_enabled:
ret.update({"box3d": box3d})
return ret
def get_sample_region(self, boxes, num_loc_list, loc_xs, loc_ys):
center_x = boxes[..., [0, 2]].sum(dim=-1) * 0.5
center_y = boxes[..., [1, 3]].sum(dim=-1) * 0.5
num_gts = boxes.shape[0]
K = len(loc_xs)
boxes = boxes[None].expand(K, num_gts, 4)
center_x = center_x[None].expand(K, num_gts)
center_y = center_y[None].expand(K, num_gts)
center_gt = boxes.new_zeros(boxes.shape)
# no gt
if center_x.numel() == 0 or center_x[..., 0].sum() == 0:
return loc_xs.new_zeros(loc_xs.shape, dtype=torch.uint8)
beg = 0
for level, num_loc in enumerate(num_loc_list):
end = beg + num_loc
stride = self.strides[level] * self.radius
xmin = center_x[beg:end] - stride
ymin = center_y[beg:end] - stride
xmax = center_x[beg:end] + stride
ymax = center_y[beg:end] + stride
# limit sample region in gt
center_gt[beg:end, :, 0] = torch.where(xmin > boxes[beg:end, :, 0], xmin, boxes[beg:end, :, 0])
center_gt[beg:end, :, 1] = torch.where(ymin > boxes[beg:end, :, 1], ymin, boxes[beg:end, :, 1])
center_gt[beg:end, :, 2] = torch.where(xmax > boxes[beg:end, :, 2], boxes[beg:end, :, 2], xmax)
center_gt[beg:end, :, 3] = torch.where(ymax > boxes[beg:end, :, 3], boxes[beg:end, :, 3], ymax)
beg = end
left = loc_xs[:, None] - center_gt[..., 0]
right = center_gt[..., 2] - loc_xs[:, None]
top = loc_ys[:, None] - center_gt[..., 1]
bottom = center_gt[..., 3] - loc_ys[:, None]
center_bbox = torch.stack((left, top, right, bottom), -1)
inside_gt_bbox_mask = center_bbox.min(-1)[0] > 0
return inside_gt_bbox_mask
def _transpose(self, training_targets, num_loc_list):
'''
This function is used to transpose image first training targets to level first ones
:return: level first training targets
'''
if isinstance(training_targets[0], Boxes3D):
for im_i in range(len(training_targets)):
# training_targets[im_i] = torch.split(training_targets[im_i], num_loc_list, dim=0)
training_targets[im_i] = training_targets[im_i].split(num_loc_list, dim=0)
targets_level_first = []
for targets_per_level in zip(*training_targets):
targets_level_first.append(Boxes3D.cat(targets_per_level, dim=0))
return targets_level_first
for im_i in range(len(training_targets)):
training_targets[im_i] = torch.split(training_targets[im_i], num_loc_list, dim=0)
targets_level_first = []
for targets_per_level in zip(*training_targets):
targets_level_first.append(torch.cat(targets_per_level, dim=0))
return targets_level_first
# Copyright 2021 Toyota Research Institute. All rights reserved.
from .image_list import ImageList
# Copyright 2021 Toyota Research Institute. All rights reserved.
import numpy as np
import torch
from pyquaternion import Quaternion
from torch.cuda import amp
from projects.mmdet3d_plugin.dd3d.utils.geometry import unproject_points2d
import projects.mmdet3d_plugin.dd3d.structures.transform3d as t3d
# yapf: disable
BOX3D_CORNER_MAPPING = [
[1, 1, 1, 1, -1, -1, -1, -1],
[1, -1, -1, 1, 1, -1, -1, 1],
[1, 1, -1, -1, 1, 1, -1, -1]
]
# yapf: enable
def quaternion_to_matrix(quaternions: torch.Tensor) -> torch.Tensor:
"""
Convert rotations given as quaternions to rotation matrices.
Args:
quaternions: quaternions with real part first,
as tensor of shape (..., 4).
Returns:
Rotation matrices as tensor of shape (..., 3, 3).
"""
r, i, j, k = torch.unbind(quaternions, -1)
two_s = 2.0 / (quaternions * quaternions).sum(-1)
o = torch.stack(
(
1 - two_s * (j * j + k * k),
two_s * (i * j - k * r),
two_s * (i * k + j * r),
two_s * (i * j + k * r),
1 - two_s * (i * i + k * k),
two_s * (j * k - i * r),
two_s * (i * k - j * r),
two_s * (j * k + i * r),
1 - two_s * (i * i + j * j),
),
-1,
)
return o.reshape(quaternions.shape[:-1] + (3, 3))
def _to_tensor(x, dim):
if isinstance(x, torch.Tensor):
x = x.to(torch.float32)
elif isinstance(x, np.ndarray) or isinstance(x, list) or isinstance(x, tuple):
x = torch.tensor(x, dtype=torch.float32)
elif isinstance(x, Quaternion):
x = torch.tensor(x.elements, dtype=torch.float32)
else:
raise ValueError(f"Unsupported type: {type(x).__name__}")
if x.ndim == 1:
x = x.reshape(-1, dim)
elif x.ndim > 2:
raise ValueError(f"Invalid shape of input: {x.shape.__str__()}")
return x
class GenericBoxes3D():
def __init__(self, quat, tvec, size):
self.quat = _to_tensor(quat, dim=4)
self._tvec = _to_tensor(tvec, dim=3)
self.size = _to_tensor(size, dim=3)
@property
def tvec(self):
return self._tvec
@property
@amp.autocast(enabled=False)
def corners(self):
allow_tf32 = torch.backends.cuda.matmul.allow_tf32
torch.backends.cuda.matmul.allow_tf32 = False
torch.backends.cudnn.allow_tf32 = False
translation = t3d.Translate(self.tvec, device=self.device)
R = quaternion_to_matrix(self.quat)
rotation = t3d.Rotate(R=R.transpose(1, 2), device=self.device) # Need to transpose to make it work.
tfm = rotation.compose(translation)
_corners = 0.5 * self.quat.new_tensor(BOX3D_CORNER_MAPPING).T
# corners_in_obj_frame = self.size.unsqueeze(1) * _corners.unsqueeze(0)
lwh = self.size[:, [1, 0, 2]] # wlh -> lwh
corners_in_obj_frame = lwh.unsqueeze(1) * _corners.unsqueeze(0)
corners3d = tfm.transform_points(corners_in_obj_frame)
torch.backends.cuda.matmul.allow_tf32 = allow_tf32
torch.backends.cudnn.allow_tf32 = allow_tf32
return corners3d
@classmethod
def from_vectors(cls, vecs, device="cpu"):
"""
Parameters
----------
vecs: Iterable[np.ndarray]
Iterable of 10D pose representation.
intrinsics: np.ndarray
(3, 3) intrinsics matrix.
"""
quats, tvecs, sizes = [], [], []
for vec in vecs:
quat = vec[:4]
tvec = vec[4:7]
size = vec[7:]
quats.append(quat)
tvecs.append(tvec)
sizes.append(size)
quats = torch.as_tensor(quats, dtype=torch.float32, device=device)
tvecs = torch.as_tensor(tvecs, dtype=torch.float32, device=device)
sizes = torch.as_tensor(sizes, device=device)
return cls(quats, tvecs, sizes)
@classmethod
def cat(cls, boxes_list, dim=0):
assert isinstance(boxes_list, (list, tuple))
if len(boxes_list) == 0:
return cls(torch.empty(0), torch.empty(0), torch.empty(0))
assert all([isinstance(box, GenericBoxes3D) for box in boxes_list])
# use torch.cat (v.s. layers.cat) so the returned boxes never share storage with input
quat = torch.cat([b.quat for b in boxes_list], dim=dim)
tvec = torch.cat([b.tvec for b in boxes_list], dim=dim)
size = torch.cat([b.size for b in boxes_list], dim=dim)
cat_boxes = cls(quat, tvec, size)
return cat_boxes
def split(self, split_sizes, dim=0):
assert sum(split_sizes) == len(self)
quat_list = torch.split(self.quat, split_sizes, dim=dim)
tvec_list = torch.split(self.tvec, split_sizes, dim=dim)
size_list = torch.split(self.size, split_sizes, dim=dim)
return [GenericBoxes3D(*x) for x in zip(quat_list, tvec_list, size_list)]
def __getitem__(self, item):
"""
"""
if isinstance(item, int):
return GenericBoxes3D(self.quat[item].view(1, -1), self.tvec[item].view(1, -1), self.size[item].view(1, -1))
quat = self.quat[item]
tvec = self.tvec[item]
size = self.size[item]
assert quat.dim() == 2, "Indexing on Boxes3D with {} failed to return a matrix!".format(item)
assert tvec.dim() == 2, "Indexing on Boxes3D with {} failed to return a matrix!".format(item)
assert size.dim() == 2, "Indexing on Boxes3D with {} failed to return a matrix!".format(item)
return GenericBoxes3D(quat, tvec, size)
def __len__(self):
assert len(self.quat) == len(self.tvec) == len(self.size)
return self.quat.shape[0]
def clone(self):
"""
"""
return GenericBoxes3D(self.quat.clone(), self.tvec.clone(), self.size.clone())
def vectorize(self):
xyz = self.tvec
return torch.cat([self.quat, xyz, self.size], dim=1)
@property
def device(self):
return self.quat.device
def to(self, *args, **kwargs):
quat = self.quat.to(*args, **kwargs)
tvec = self.tvec.to(*args, **kwargs)
size = self.size.to(*args, **kwargs)
return GenericBoxes3D(quat, tvec, size)
class Boxes3D(GenericBoxes3D):
"""Vision-based 3D box container.
The tvec is computed from projected center, depth, and intrinsics.
"""
def __init__(self, quat, proj_ctr, depth, size, inv_intrinsics):
self.quat = quat
self.proj_ctr = proj_ctr
self.depth = depth
self.size = size
self.inv_intrinsics = inv_intrinsics
@property
def tvec(self):
ray = unproject_points2d(self.proj_ctr, self.inv_intrinsics)
xyz = ray * self.depth
return xyz
@classmethod
def from_vectors(cls, vecs, intrinsics, device="cpu"):
"""
Parameters
----------
vecs: Iterable[np.ndarray]
Iterable of 10D pose representation.
intrinsics: np.ndarray
(3, 3) intrinsics matrix.
"""
if len(vecs) == 0:
quats = torch.as_tensor([], dtype=torch.float32, device=device).view(-1, 4)
proj_ctrs = torch.as_tensor([], dtype=torch.float32, device=device).view(-1, 2)
depths = torch.as_tensor([], dtype=torch.float32, device=device).view(-1, 1)
sizes = torch.as_tensor([], dtype=torch.float32, device=device).view(-1, 3)
inv_intrinsics = torch.as_tensor([], dtype=torch.float32, device=device).view(-1, 3, 3)
return cls(quats, proj_ctrs, depths, sizes, inv_intrinsics)
quats, proj_ctrs, depths, sizes = [], [], [], []
for vec in vecs:
quat = vec[:4]
proj_ctr = intrinsics.dot(vec[4:7])
proj_ctr = proj_ctr[:2] / proj_ctr[-1]
depth = vec[6:7]
size = vec[7:]
quats.append(quat)
proj_ctrs.append(proj_ctr)
depths.append(depth)
sizes.append(size)
quats = torch.as_tensor(np.array(quats), dtype=torch.float32, device=device)
proj_ctrs = torch.as_tensor(np.array(proj_ctrs), dtype=torch.float32, device=device)
depths = torch.as_tensor(np.array(depths), dtype=torch.float32, device=device)
sizes = torch.as_tensor(np.array(sizes), dtype=torch.float32, device=device)
inv_intrinsics = np.linalg.inv(intrinsics)
inv_intrinsics = torch.as_tensor(inv_intrinsics[None, ...], device=device).expand(len(vecs), 3, 3)
return cls(quats, proj_ctrs, depths, sizes, inv_intrinsics)
@classmethod
def cat(cls, boxes_list, dim=0):
assert isinstance(boxes_list, (list, tuple))
if len(boxes_list) == 0:
return cls(torch.empty(0), torch.empty(0), torch.empty(0), torch.empty(0), torch.empty(0))
assert all([isinstance(box, Boxes3D) for box in boxes_list])
# use torch.cat (v.s. layers.cat) so the returned boxes never share storage with input
quat = torch.cat([b.quat for b in boxes_list], dim=dim)
proj_ctr = torch.cat([b.proj_ctr for b in boxes_list], dim=dim)
depth = torch.cat([b.depth for b in boxes_list], dim=dim)
size = torch.cat([b.size for b in boxes_list], dim=dim)
inv_intrinsics = torch.cat([b.inv_intrinsics for b in boxes_list], dim=dim)
cat_boxes = cls(quat, proj_ctr, depth, size, inv_intrinsics)
return cat_boxes
def split(self, split_sizes, dim=0):
assert sum(split_sizes) == len(self)
quat_list = torch.split(self.quat, split_sizes, dim=dim)
proj_ctr_list = torch.split(self.proj_ctr, split_sizes, dim=dim)
depth_list = torch.split(self.depth, split_sizes, dim=dim)
size_list = torch.split(self.size, split_sizes, dim=dim)
inv_K_list = torch.split(self.inv_intrinsics, split_sizes, dim=dim)
return [Boxes3D(*x) for x in zip(quat_list, proj_ctr_list, depth_list, size_list, inv_K_list)]
def __getitem__(self, item):
"""
"""
if isinstance(item, int):
return Boxes3D(
self.quat[item].view(1, -1), self.proj_ctr[item].view(1, -1), self.depth[item].view(1, -1),
self.size[item].view(1, -1), self.inv_intrinsics[item].view(1, 3, 3)
)
quat = self.quat[item]
ctr = self.proj_ctr[item]
depth = self.depth[item]
size = self.size[item]
inv_K = self.inv_intrinsics[item]
assert quat.dim() == 2, "Indexing on Boxes3D with {} failed to return a matrix!".format(item)
assert ctr.dim() == 2, "Indexing on Boxes3D with {} failed to return a matrix!".format(item)
assert depth.dim() == 2, "Indexing on Boxes3D with {} failed to return a matrix!".format(item)
assert size.dim() == 2, "Indexing on Boxes3D with {} failed to return a matrix!".format(item)
assert inv_K.dim() == 3, "Indexing on Boxes3D with {} failed to return a matrix!".format(item)
assert inv_K.shape[1:] == (3, 3), "Indexing on Boxes3D with {} failed to return a matrix!".format(item)
return Boxes3D(quat, ctr, depth, size, inv_K)
def __len__(self):
assert len(self.quat) == len(self.proj_ctr) == len(self.depth) == len(self.size) == len(self.inv_intrinsics)
return self.quat.shape[0]
def clone(self):
"""
"""
return Boxes3D(
self.quat.clone(), self.proj_ctr.clone(), self.depth.clone(), self.size.clone(), self.inv_intrinsics.clone()
)
def to(self, *args, **kwargs):
quat = self.quat.to(*args, **kwargs)
proj_ctr = self.proj_ctr.to(*args, **kwargs)
depth = self.depth.to(*args, **kwargs)
size = self.size.to(*args, **kwargs)
inv_K = self.inv_intrinsics.to(*args, **kwargs)
return Boxes3D(quat, proj_ctr, depth, size, inv_K)
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
# Copyright 2021 Toyota Research Institute. All rights reserved.
from __future__ import division
from typing import Any, List, Sequence, Tuple
import torch
from torch import device
from torch.nn import functional as F
from detectron2.utils.env import TORCH_VERSION
def _as_tensor(x: Tuple[int, int]) -> torch.Tensor:
"""
An equivalent of `torch.as_tensor`, but works under tracing if input
is a list of tensor. `torch.as_tensor` will record a constant in tracing,
but this function will use `torch.stack` instead.
"""
if torch.jit.is_scripting():
return torch.as_tensor(x)
if isinstance(x, (list, tuple)) and all([isinstance(t, torch.Tensor) for t in x]):
return torch.stack(x)
return torch.as_tensor(x)
class ImageList(object):
"""
Adapted from detectron2:
https://github.com/facebookresearch/detectron2/blob/master/detectron2/structures/image_list.py)
Key differences:
- add optional intrinsics
- add optional image path (useful for debugging)
==================================================================================================================
Structure that holds a list of images (of possibly
varying sizes) as a single tensor.
This works by padding the images to the same size,
and storing in a field the original sizes of each image
Attributes:
image_sizes (list[tuple[int, int]]): each tuple is (h, w)
"""
def __init__(self, tensor: torch.Tensor, image_sizes: List[Tuple[int, int]], intrinsics=None, image_paths=None):
"""
Arguments:
tensor (Tensor): of shape (N, H, W) or (N, C_1, ..., C_K, H, W) where K >= 1
image_sizes (list[tuple[int, int]]): Each tuple is (h, w). It can
be smaller than (H, W) due to padding.
"""
self.tensor = tensor
self.image_sizes = image_sizes
self._intrinsics = intrinsics
self._image_paths = image_paths
@property
def intrinsics(self):
if torch.allclose(self._intrinsics[0], torch.eye(3, device=self._intrinsics.device)):
# TODO: torch.inverse(images.intrinsics) often return identity, when it shouldn't. Is it pytorch bug?
raise ValueError("Intrinsics is Identity.")
return self._intrinsics
@property
def image_paths(self):
return self._image_paths
def __len__(self) -> int:
return len(self.image_sizes)
def __getitem__(self, idx) -> torch.Tensor:
"""
Access the individual image in its original size.
Args:
idx: int or slice
Returns:
Tensor: an image of shape (H, W) or (C_1, ..., C_K, H, W) where K >= 1
"""
size = self.image_sizes[idx]
return self.tensor[idx, ..., :size[0], :size[1]]
@torch.jit.unused
def to(self, *args: Any, **kwargs: Any) -> "ImageList":
cast_tensor = self.tensor.to(*args, **kwargs)
return ImageList(cast_tensor, self.image_sizes, intrinsics=self.intrinsics)
@property
def device(self) -> device:
return self.tensor.device
@staticmethod
def from_tensors(
tensors: List[torch.Tensor],
size_divisibility: int = 0,
pad_value: float = 0.0,
intrinsics=None,
image_paths=None
) -> "ImageList":
"""
Args:
tensors: a tuple or list of `torch.Tensor`, each of shape (Hi, Wi) or
(C_1, ..., C_K, Hi, Wi) where K >= 1. The Tensors will be padded
to the same shape with `pad_value`.
size_divisibility (int): If `size_divisibility > 0`, add padding to ensure
the common height and width is divisible by `size_divisibility`.
This depends on the model and many models need a divisibility of 32.
pad_value (float): value to pad
Returns:
an `ImageList`.
"""
assert len(tensors) > 0
assert isinstance(tensors, (tuple, list))
for t in tensors:
assert isinstance(t, torch.Tensor), type(t)
assert t.shape[:-2] == tensors[0].shape[:-2], t.shape
image_sizes = [(im.shape[-2], im.shape[-1]) for im in tensors]
image_sizes_tensor = [_as_tensor(x) for x in image_sizes]
max_size = torch.stack(image_sizes_tensor).max(0).values
if size_divisibility > 1:
stride = size_divisibility
# the last two dims are H,W, both subject to divisibility requirement
max_size = torch.div(max_size + (stride - 1), stride, rounding_mode='floor') * stride
# handle weirdness of scripting and tracing ...
if torch.jit.is_scripting():
max_size: List[int] = max_size.to(dtype=torch.long).tolist()
else:
# https://github.com/pytorch/pytorch/issues/42448
if TORCH_VERSION >= (1, 7) and torch.jit.is_tracing():
image_sizes = image_sizes_tensor
if len(tensors) == 1:
# This seems slightly (2%) faster.
# TODO: check whether it's faster for multiple images as well
image_size = image_sizes[0]
padding_size = [0, max_size[-1] - image_size[1], 0, max_size[-2] - image_size[0]]
batched_imgs = F.pad(tensors[0], padding_size, value=pad_value).unsqueeze_(0)
else:
# max_size can be a tensor in tracing mode, therefore convert to list
batch_shape = [len(tensors)] + list(tensors[0].shape[:-2]) + list(max_size)
batched_imgs = tensors[0].new_full(batch_shape, pad_value)
for img, pad_img in zip(tensors, batched_imgs):
pad_img[..., :img.shape[-2], :img.shape[-1]].copy_(img)
if intrinsics is not None:
assert isinstance(intrinsics, (tuple, list))
assert len(intrinsics) == len(tensors)
intrinsics = torch.stack(intrinsics, dim=0)
if image_paths is not None:
assert len(image_paths) == len(tensors)
return ImageList(batched_imgs.contiguous(), image_sizes, intrinsics, image_paths)
# Copyright 2021 Toyota Research Institute. All rights reserved.
import numpy as np
from pyquaternion import Quaternion
class Pose:
"""SE(3) rigid transform class that allows compounding of 6-DOF poses
and provides common transformations that are commonly seen in geometric problems.
"""
def __init__(self, wxyz=np.float32([1., 0., 0., 0.]), tvec=np.float32([0., 0., 0.])):
"""Initialize a Pose with Quaternion and 3D Position
Parameters
----------
wxyz: np.float32 or Quaternion (default: np.float32([1,0,0,0]))
Quaternion/Rotation (wxyz)
tvec: np.float32 (default: np.float32([0,0,0]))
Translation (xyz)
"""
assert isinstance(wxyz, (np.ndarray, Quaternion))
assert isinstance(tvec, np.ndarray)
if isinstance(wxyz, np.ndarray):
assert np.abs(1.0 - np.linalg.norm(wxyz)) < 1.0e-3
self.quat = Quaternion(wxyz)
self.tvec = tvec
def __repr__(self):
formatter = {'float_kind': lambda x: '%.2f' % x}
tvec_str = np.array2string(self.tvec, formatter=formatter)
return 'wxyz: {}, tvec: ({})'.format(self.quat, tvec_str)
def copy(self):
"""Return a copy of this pose object.
Returns
----------
result: Pose
Copied pose object.
"""
return self.__class__(Quaternion(self.quat), self.tvec.copy())
def __mul__(self, other):
"""Left-multiply Pose with another Pose or 3D-Points.
Parameters
----------
other: Pose or np.ndarray
1. Pose: Identical to oplus operation.
(i.e. self_pose * other_pose)
2. ndarray: transform [N x 3] point set
(i.e. X' = self_pose * X)
Returns
----------
result: Pose or np.ndarray
Transformed pose or point cloud
"""
if isinstance(other, Pose):
assert isinstance(other, self.__class__)
t = self.quat.rotate(other.tvec) + self.tvec
q = self.quat * other.quat
return self.__class__(q, t)
elif isinstance(other, np.ndarray):
assert other.shape[-1] == 3, 'Point cloud is not 3-dimensional'
X = np.hstack([other, np.ones((len(other), 1))]).T
return (np.dot(self.matrix, X).T)[:, :3]
else:
return NotImplemented
def __rmul__(self, other):
raise NotImplementedError('Right multiply not implemented yet!')
def inverse(self):
"""Returns a new Pose that corresponds to the
inverse of this one.
Returns
----------
result: Pose
Inverted pose
"""
qinv = self.quat.inverse
return self.__class__(qinv, qinv.rotate(-self.tvec))
@property
def matrix(self):
"""Returns a 4x4 homogeneous matrix of the form [R t; 0 1]
Returns
----------
result: np.ndarray
4x4 homogeneous matrix
"""
result = self.quat.transformation_matrix
result[:3, 3] = self.tvec
return result
@property
def rotation_matrix(self):
"""Returns the 3x3 rotation matrix (R)
Returns
----------
result: np.ndarray
3x3 rotation matrix
"""
result = self.quat.transformation_matrix
return result[:3, :3]
@property
def rotation(self):
"""Return the rotation component of the pose as a Quaternion object.
Returns
----------
self.quat: Quaternion
Rotation component of the Pose object.
"""
return self.quat
@property
def translation(self):
"""Return the translation component of the pose as a np.ndarray.
Returns
----------
self.tvec: np.ndarray
Translation component of the Pose object.
"""
return self.tvec
@classmethod
def from_matrix(cls, transformation_matrix):
"""Initialize pose from 4x4 transformation matrix
Parameters
----------
transformation_matrix: np.ndarray
4x4 containing rotation/translation
Returns
-------
Pose
"""
return cls(wxyz=Quaternion(matrix=transformation_matrix[:3, :3]), tvec=np.float32(transformation_matrix[:3, 3]))
@classmethod
def from_rotation_translation(cls, rotation_matrix, tvec):
"""Initialize pose from rotation matrix and translation vector.
Parameters
----------
rotation_matrix : np.ndarray
3x3 rotation matrix
tvec : np.ndarray
length-3 translation vector
"""
return cls(wxyz=Quaternion(matrix=rotation_matrix), tvec=np.float64(tvec))
def __eq__(self, other):
return self.quat == other.quat and (self.tvec == other.tvec).all()
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import math
import warnings
from typing import List, Optional, Union
import torch
Device = Union[str, torch.device]
def make_device(device: Device) -> torch.device:
"""
Makes an actual torch.device object from the device specified as
either a string or torch.device object. If the device is `cuda` without
a specific index, the index of the current device is assigned.
Args:
device: Device (as str or torch.device)
Returns:
A matching torch.device object
"""
device = torch.device(device) if isinstance(device, str) else device
if device.type == "cuda" and device.index is None: # pyre-ignore[16]
# If cuda but with no index, then the current cuda device is indicated.
# In that case, we fix to that device
device = torch.device(f"cuda:{torch.cuda.current_device()}")
return device
def get_device(x, device: Optional[Device] = None) -> torch.device:
"""
Gets the device of the specified variable x if it is a tensor, or
falls back to a default CPU device otherwise. Allows overriding by
providing an explicit device.
Args:
x: a torch.Tensor to get the device from or another type
device: Device (as str or torch.device) to fall back to
Returns:
A matching torch.device object
"""
# User overrides device
if device is not None:
return make_device(device)
# Set device based on input tensor
if torch.is_tensor(x):
return x.device
# Default device is cpu
return torch.device("cpu")
def _safe_det_3x3(t: torch.Tensor):
"""
Fast determinant calculation for a batch of 3x3 matrices.
Note, result of this function might not be the same as `torch.det()`.
The differences might be in the last significant digit.
Args:
t: Tensor of shape (N, 3, 3).
Returns:
Tensor of shape (N) with determinants.
"""
det = (
t[..., 0, 0] * (t[..., 1, 1] * t[..., 2, 2] - t[..., 1, 2] * t[..., 2, 1])
- t[..., 0, 1] * (t[..., 1, 0] * t[..., 2, 2] - t[..., 2, 0] * t[..., 1, 2])
+ t[..., 0, 2] * (t[..., 1, 0] * t[..., 2, 1] - t[..., 2, 0] * t[..., 1, 1])
)
return det
def _axis_angle_rotation(axis: str, angle: torch.Tensor) -> torch.Tensor:
"""
Return the rotation matrices for one of the rotations about an axis
of which Euler angles describe, for each value of the angle given.
Args:
axis: Axis label "X" or "Y or "Z".
angle: any shape tensor of Euler angles in radians
Returns:
Rotation matrices as tensor of shape (..., 3, 3).
"""
cos = torch.cos(angle)
sin = torch.sin(angle)
one = torch.ones_like(angle)
zero = torch.zeros_like(angle)
if axis == "X":
R_flat = (one, zero, zero, zero, cos, -sin, zero, sin, cos)
elif axis == "Y":
R_flat = (cos, zero, sin, zero, one, zero, -sin, zero, cos)
elif axis == "Z":
R_flat = (cos, -sin, zero, sin, cos, zero, zero, zero, one)
else:
raise ValueError("letter must be either X, Y or Z.")
return torch.stack(R_flat, -1).reshape(angle.shape + (3, 3))
class Transform3d:
"""
A Transform3d object encapsulates a batch of N 3D transformations, and knows
how to transform points and normal vectors. Suppose that t is a Transform3d;
then we can do the following:
.. code-block:: python
N = len(t)
points = torch.randn(N, P, 3)
normals = torch.randn(N, P, 3)
points_transformed = t.transform_points(points) # => (N, P, 3)
normals_transformed = t.transform_normals(normals) # => (N, P, 3)
BROADCASTING
Transform3d objects supports broadcasting. Suppose that t1 and tN are
Transform3d objects with len(t1) == 1 and len(tN) == N respectively. Then we
can broadcast transforms like this:
.. code-block:: python
t1.transform_points(torch.randn(P, 3)) # => (P, 3)
t1.transform_points(torch.randn(1, P, 3)) # => (1, P, 3)
t1.transform_points(torch.randn(M, P, 3)) # => (M, P, 3)
tN.transform_points(torch.randn(P, 3)) # => (N, P, 3)
tN.transform_points(torch.randn(1, P, 3)) # => (N, P, 3)
COMBINING TRANSFORMS
Transform3d objects can be combined in two ways: composing and stacking.
Composing is function composition. Given Transform3d objects t1, t2, t3,
the following all compute the same thing:
.. code-block:: python
y1 = t3.transform_points(t2.transform_points(t1.transform_points(x)))
y2 = t1.compose(t2).compose(t3).transform_points(x)
y3 = t1.compose(t2, t3).transform_points(x)
Composing transforms should broadcast.
.. code-block:: python
if len(t1) == 1 and len(t2) == N, then len(t1.compose(t2)) == N.
We can also stack a sequence of Transform3d objects, which represents
composition along the batch dimension; then the following should compute the
same thing.
.. code-block:: python
N, M = len(tN), len(tM)
xN = torch.randn(N, P, 3)
xM = torch.randn(M, P, 3)
y1 = torch.cat([tN.transform_points(xN), tM.transform_points(xM)], dim=0)
y2 = tN.stack(tM).transform_points(torch.cat([xN, xM], dim=0))
BUILDING TRANSFORMS
We provide convenience methods for easily building Transform3d objects
as compositions of basic transforms.
.. code-block:: python
# Scale by 0.5, then translate by (1, 2, 3)
t1 = Transform3d().scale(0.5).translate(1, 2, 3)
# Scale each axis by a different amount, then translate, then scale
t2 = Transform3d().scale(1, 3, 3).translate(2, 3, 1).scale(2.0)
t3 = t1.compose(t2)
tN = t1.stack(t3, t3)
BACKPROP THROUGH TRANSFORMS
When building transforms, we can also parameterize them by Torch tensors;
in this case we can backprop through the construction and application of
Transform objects, so they could be learned via gradient descent or
predicted by a neural network.
.. code-block:: python
s1_params = torch.randn(N, requires_grad=True)
t_params = torch.randn(N, 3, requires_grad=True)
s2_params = torch.randn(N, 3, requires_grad=True)
t = Transform3d().scale(s1_params).translate(t_params).scale(s2_params)
x = torch.randn(N, 3)
y = t.transform_points(x)
loss = compute_loss(y)
loss.backward()
with torch.no_grad():
s1_params -= lr * s1_params.grad
t_params -= lr * t_params.grad
s2_params -= lr * s2_params.grad
CONVENTIONS
We adopt a right-hand coordinate system, meaning that rotation about an axis
with a positive angle results in a counter clockwise rotation.
This class assumes that transformations are applied on inputs which
are row vectors. The internal representation of the Nx4x4 transformation
matrix is of the form:
.. code-block:: python
M = [
[Rxx, Ryx, Rzx, 0],
[Rxy, Ryy, Rzy, 0],
[Rxz, Ryz, Rzz, 0],
[Tx, Ty, Tz, 1],
]
To apply the transformation to points which are row vectors, the M matrix
can be pre multiplied by the points:
.. code-block:: python
points = [[0, 1, 2]] # (1 x 3) xyz coordinates of a point
transformed_points = points * M
"""
def __init__(
self,
dtype: torch.dtype = torch.float32,
device: Device = "cpu",
matrix: Optional[torch.Tensor] = None,
) -> None:
"""
Args:
dtype: The data type of the transformation matrix.
to be used if `matrix = None`.
device: The device for storing the implemented transformation.
If `matrix != None`, uses the device of input `matrix`.
matrix: A tensor of shape (4, 4) or of shape (minibatch, 4, 4)
representing the 4x4 3D transformation matrix.
If `None`, initializes with identity using
the specified `device` and `dtype`.
"""
if matrix is None:
self._matrix = torch.eye(4, dtype=dtype, device=device).view(1, 4, 4)
else:
if matrix.ndim not in (2, 3):
raise ValueError('"matrix" has to be a 2- or a 3-dimensional tensor.')
if matrix.shape[-2] != 4 or matrix.shape[-1] != 4:
raise ValueError(
'"matrix" has to be a tensor of shape (minibatch, 4, 4)'
)
# set dtype and device from matrix
dtype = matrix.dtype
device = matrix.device
self._matrix = matrix.view(-1, 4, 4)
self._transforms = [] # store transforms to compose
self._lu = None
self.device = make_device(device)
self.dtype = dtype
def __len__(self) -> int:
return self.get_matrix().shape[0]
def __getitem__(
self, index: Union[int, List[int], slice, torch.Tensor]
) -> "Transform3d":
"""
Args:
index: Specifying the index of the transform to retrieve.
Can be an int, slice, list of ints, boolean, long tensor.
Supports negative indices.
Returns:
Transform3d object with selected transforms. The tensors are not cloned.
"""
if isinstance(index, int):
index = [index]
return self.__class__(matrix=self.get_matrix()[index])
def compose(self, *others: "Transform3d") -> "Transform3d":
"""
Return a new Transform3d representing the composition of self with the
given other transforms, which will be stored as an internal list.
Args:
*others: Any number of Transform3d objects
Returns:
A new Transform3d with the stored transforms
"""
out = Transform3d(dtype=self.dtype, device=self.device)
out._matrix = self._matrix.clone()
for other in others:
if not isinstance(other, Transform3d):
msg = "Only possible to compose Transform3d objects; got %s"
raise ValueError(msg % type(other))
out._transforms = self._transforms + list(others)
return out
def get_matrix(self) -> torch.Tensor:
"""
Return a matrix which is the result of composing this transform
with others stored in self.transforms. Where necessary transforms
are broadcast against each other.
For example, if self.transforms contains transforms t1, t2, and t3, and
given a set of points x, the following should be true:
.. code-block:: python
y1 = t1.compose(t2, t3).transform(x)
y2 = t3.transform(t2.transform(t1.transform(x)))
y1.get_matrix() == y2.get_matrix()
Returns:
A transformation matrix representing the composed inputs.
"""
composed_matrix = self._matrix.clone()
if len(self._transforms) > 0:
for other in self._transforms:
other_matrix = other.get_matrix()
composed_matrix = _broadcast_bmm(composed_matrix, other_matrix)
return composed_matrix
def _get_matrix_inverse(self) -> torch.Tensor:
"""
Return the inverse of self._matrix.
"""
return torch.inverse(self._matrix)
def inverse(self, invert_composed: bool = False) -> "Transform3d":
"""
Returns a new Transform3d object that represents an inverse of the
current transformation.
Args:
invert_composed:
- True: First compose the list of stored transformations
and then apply inverse to the result. This is
potentially slower for classes of transformations
with inverses that can be computed efficiently
(e.g. rotations and translations).
- False: Invert the individual stored transformations
independently without composing them.
Returns:
A new Transform3d object containing the inverse of the original
transformation.
"""
tinv = Transform3d(dtype=self.dtype, device=self.device)
if invert_composed:
# first compose then invert
tinv._matrix = torch.inverse(self.get_matrix())
else:
# self._get_matrix_inverse() implements efficient inverse
# of self._matrix
i_matrix = self._get_matrix_inverse()
# 2 cases:
if len(self._transforms) > 0:
# a) Either we have a non-empty list of transforms:
# Here we take self._matrix and append its inverse at the
# end of the reverted _transforms list. After composing
# the transformations with get_matrix(), this correctly
# right-multiplies by the inverse of self._matrix
# at the end of the composition.
tinv._transforms = [t.inverse() for t in reversed(self._transforms)]
last = Transform3d(dtype=self.dtype, device=self.device)
last._matrix = i_matrix
tinv._transforms.append(last)
else:
# b) Or there are no stored transformations
# we just set inverted matrix
tinv._matrix = i_matrix
return tinv
def stack(self, *others: "Transform3d") -> "Transform3d":
"""
Return a new batched Transform3d representing the batch elements from
self and all the given other transforms all batched together.
Args:
*others: Any number of Transform3d objects
Returns:
A new Transform3d.
"""
transforms = [self] + list(others)
matrix = torch.cat([t.get_matrix() for t in transforms], dim=0)
out = Transform3d(dtype=self.dtype, device=self.device)
out._matrix = matrix
return out
def transform_points(self, points, eps: Optional[float] = None) -> torch.Tensor:
"""
Use this transform to transform a set of 3D points. Assumes row major
ordering of the input points.
Args:
points: Tensor of shape (P, 3) or (N, P, 3)
eps: If eps!=None, the argument is used to clamp the
last coordinate before performing the final division.
The clamping corresponds to:
last_coord := (last_coord.sign() + (last_coord==0)) *
torch.clamp(last_coord.abs(), eps),
i.e. the last coordinates that are exactly 0 will
be clamped to +eps.
Returns:
points_out: points of shape (N, P, 3) or (P, 3) depending
on the dimensions of the transform
"""
points_batch = points.clone()
if points_batch.dim() == 2:
points_batch = points_batch[None] # (P, 3) -> (1, P, 3)
if points_batch.dim() != 3:
msg = "Expected points to have dim = 2 or dim = 3: got shape %r"
raise ValueError(msg % repr(points.shape))
N, P, _3 = points_batch.shape
ones = torch.ones(N, P, 1, dtype=points.dtype, device=points.device)
points_batch = torch.cat([points_batch, ones], dim=2)
composed_matrix = self.get_matrix()
points_out = _broadcast_bmm(points_batch, composed_matrix)
denom = points_out[..., 3:] # denominator
if eps is not None:
denom_sign = denom.sign() + (denom == 0.0).type_as(denom)
denom = denom_sign * torch.clamp(denom.abs(), eps)
points_out = points_out[..., :3] / denom
# When transform is (1, 4, 4) and points is (P, 3) return
# points_out of shape (P, 3)
if points_out.shape[0] == 1 and points.dim() == 2:
points_out = points_out.reshape(points.shape)
return points_out
def transform_normals(self, normals) -> torch.Tensor:
"""
Use this transform to transform a set of normal vectors.
Args:
normals: Tensor of shape (P, 3) or (N, P, 3)
Returns:
normals_out: Tensor of shape (P, 3) or (N, P, 3) depending
on the dimensions of the transform
"""
if normals.dim() not in [2, 3]:
msg = "Expected normals to have dim = 2 or dim = 3: got shape %r"
raise ValueError(msg % (normals.shape,))
composed_matrix = self.get_matrix()
# TODO: inverse is bad! Solve a linear system instead
mat = composed_matrix[:, :3, :3]
normals_out = _broadcast_bmm(normals, mat.transpose(1, 2).inverse())
# This doesn't pass unit tests. TODO investigate further
# if self._lu is None:
# self._lu = self._matrix[:, :3, :3].transpose(1, 2).lu()
# normals_out = normals.lu_solve(*self._lu)
# When transform is (1, 4, 4) and normals is (P, 3) return
# normals_out of shape (P, 3)
if normals_out.shape[0] == 1 and normals.dim() == 2:
normals_out = normals_out.reshape(normals.shape)
return normals_out
def translate(self, *args, **kwargs) -> "Transform3d":
return self.compose(
Translate(device=self.device, dtype=self.dtype, *args, **kwargs)
)
def scale(self, *args, **kwargs) -> "Transform3d":
return self.compose(
Scale(device=self.device, dtype=self.dtype, *args, **kwargs)
)
def rotate(self, *args, **kwargs) -> "Transform3d":
return self.compose(
Rotate(device=self.device, dtype=self.dtype, *args, **kwargs)
)
def rotate_axis_angle(self, *args, **kwargs) -> "Transform3d":
return self.compose(
RotateAxisAngle(device=self.device, dtype=self.dtype, *args, **kwargs)
)
def clone(self) -> "Transform3d":
"""
Deep copy of Transforms object. All internal tensors are cloned
individually.
Returns:
new Transforms object.
"""
other = Transform3d(dtype=self.dtype, device=self.device)
if self._lu is not None:
other._lu = [elem.clone() for elem in self._lu]
other._matrix = self._matrix.clone()
other._transforms = [t.clone() for t in self._transforms]
return other
def to(
self,
device: Device,
copy: bool = False,
dtype: Optional[torch.dtype] = None,
) -> "Transform3d":
"""
Match functionality of torch.Tensor.to()
If copy = True or the self Tensor is on a different device, the
returned tensor is a copy of self with the desired torch.device.
If copy = False and the self Tensor already has the correct torch.device,
then self is returned.
Args:
device: Device (as str or torch.device) for the new tensor.
copy: Boolean indicator whether or not to clone self. Default False.
dtype: If not None, casts the internal tensor variables
to a given torch.dtype.
Returns:
Transform3d object.
"""
device_ = make_device(device)
dtype_ = self.dtype if dtype is None else dtype
skip_to = self.device == device_ and self.dtype == dtype_
if not copy and skip_to:
return self
other = self.clone()
if skip_to:
return other
other.device = device_
other.dtype = dtype_
other._matrix = other._matrix.to(device=device_, dtype=dtype_)
other._transforms = [
t.to(device_, copy=copy, dtype=dtype_) for t in other._transforms
]
return other
def cpu(self) -> "Transform3d":
return self.to("cpu")
def cuda(self) -> "Transform3d":
return self.to("cuda")
class Translate(Transform3d):
def __init__(
self,
x,
y=None,
z=None,
dtype: torch.dtype = torch.float32,
device: Optional[Device] = None,
) -> None:
"""
Create a new Transform3d representing 3D translations.
Option I: Translate(xyz, dtype=torch.float32, device='cpu')
xyz should be a tensor of shape (N, 3)
Option II: Translate(x, y, z, dtype=torch.float32, device='cpu')
Here x, y, and z will be broadcast against each other and
concatenated to form the translation. Each can be:
- A python scalar
- A torch scalar
- A 1D torch tensor
"""
xyz = _handle_input(x, y, z, dtype, device, "Translate")
super().__init__(device=xyz.device, dtype=dtype)
N = xyz.shape[0]
mat = torch.eye(4, dtype=dtype, device=self.device)
mat = mat.view(1, 4, 4).repeat(N, 1, 1)
mat[:, 3, :3] = xyz
self._matrix = mat
def _get_matrix_inverse(self) -> torch.Tensor:
"""
Return the inverse of self._matrix.
"""
inv_mask = self._matrix.new_ones([1, 4, 4])
inv_mask[0, 3, :3] = -1.0
i_matrix = self._matrix * inv_mask
return i_matrix
class Scale(Transform3d):
def __init__(
self,
x,
y=None,
z=None,
dtype: torch.dtype = torch.float32,
device: Optional[Device] = None,
) -> None:
"""
A Transform3d representing a scaling operation, with different scale
factors along each coordinate axis.
Option I: Scale(s, dtype=torch.float32, device='cpu')
s can be one of
- Python scalar or torch scalar: Single uniform scale
- 1D torch tensor of shape (N,): A batch of uniform scale
- 2D torch tensor of shape (N, 3): Scale differently along each axis
Option II: Scale(x, y, z, dtype=torch.float32, device='cpu')
Each of x, y, and z can be one of
- python scalar
- torch scalar
- 1D torch tensor
"""
xyz = _handle_input(x, y, z, dtype, device, "scale", allow_singleton=True)
super().__init__(device=xyz.device, dtype=dtype)
N = xyz.shape[0]
# TODO: Can we do this all in one go somehow?
mat = torch.eye(4, dtype=dtype, device=self.device)
mat = mat.view(1, 4, 4).repeat(N, 1, 1)
mat[:, 0, 0] = xyz[:, 0]
mat[:, 1, 1] = xyz[:, 1]
mat[:, 2, 2] = xyz[:, 2]
self._matrix = mat
def _get_matrix_inverse(self) -> torch.Tensor:
"""
Return the inverse of self._matrix.
"""
xyz = torch.stack([self._matrix[:, i, i] for i in range(4)], dim=1)
ixyz = 1.0 / xyz
imat = torch.diag_embed(ixyz, dim1=1, dim2=2)
return imat
class Rotate(Transform3d):
def __init__(
self,
R: torch.Tensor,
dtype: torch.dtype = torch.float32,
device: Optional[Device] = None,
orthogonal_tol: float = 1e-5,
) -> None:
"""
Create a new Transform3d representing 3D rotation using a rotation
matrix as the input.
Args:
R: a tensor of shape (3, 3) or (N, 3, 3)
orthogonal_tol: tolerance for the test of the orthogonality of R
"""
device_ = get_device(R, device)
super().__init__(device=device_, dtype=dtype)
if R.dim() == 2:
R = R[None]
if R.shape[-2:] != (3, 3):
msg = "R must have shape (3, 3) or (N, 3, 3); got %s"
raise ValueError(msg % repr(R.shape))
R = R.to(device=device_, dtype=dtype)
_check_valid_rotation_matrix(R, tol=orthogonal_tol)
N = R.shape[0]
mat = torch.eye(4, dtype=dtype, device=device_)
mat = mat.view(1, 4, 4).repeat(N, 1, 1)
mat[:, :3, :3] = R
self._matrix = mat
def _get_matrix_inverse(self) -> torch.Tensor:
"""
Return the inverse of self._matrix.
"""
return self._matrix.permute(0, 2, 1).contiguous()
class RotateAxisAngle(Rotate):
def __init__(
self,
angle,
axis: str = "X",
degrees: bool = True,
dtype: torch.dtype = torch.float32,
device: Optional[Device] = None,
) -> None:
"""
Create a new Transform3d representing 3D rotation about an axis
by an angle.
Assuming a right-hand coordinate system, positive rotation angles result
in a counter clockwise rotation.
Args:
angle:
- A torch tensor of shape (N,)
- A python scalar
- A torch scalar
axis:
string: one of ["X", "Y", "Z"] indicating the axis about which
to rotate.
NOTE: All batch elements are rotated about the same axis.
"""
axis = axis.upper()
if axis not in ["X", "Y", "Z"]:
msg = "Expected axis to be one of ['X', 'Y', 'Z']; got %s"
raise ValueError(msg % axis)
angle = _handle_angle_input(angle, dtype, device, "RotateAxisAngle")
angle = (angle / 180.0 * math.pi) if degrees else angle
# We assume the points on which this transformation will be applied
# are row vectors. The rotation matrix returned from _axis_angle_rotation
# is for transforming column vectors. Therefore we transpose this matrix.
# R will always be of shape (N, 3, 3)
R = _axis_angle_rotation(axis, angle).transpose(1, 2)
super().__init__(device=angle.device, R=R, dtype=dtype)
def _handle_coord(c, dtype: torch.dtype, device: torch.device) -> torch.Tensor:
"""
Helper function for _handle_input.
Args:
c: Python scalar, torch scalar, or 1D torch tensor
Returns:
c_vec: 1D torch tensor
"""
if not torch.is_tensor(c):
c = torch.tensor(c, dtype=dtype, device=device)
if c.dim() == 0:
c = c.view(1)
if c.device != device or c.dtype != dtype:
c = c.to(device=device, dtype=dtype)
return c
def _handle_input(
x,
y,
z,
dtype: torch.dtype,
device: Optional[Device],
name: str,
allow_singleton: bool = False,
) -> torch.Tensor:
"""
Helper function to handle parsing logic for building transforms. The output
is always a tensor of shape (N, 3), but there are several types of allowed
input.
Case I: Single Matrix
In this case x is a tensor of shape (N, 3), and y and z are None. Here just
return x.
Case II: Vectors and Scalars
In this case each of x, y, and z can be one of the following
- Python scalar
- Torch scalar
- Torch tensor of shape (N, 1) or (1, 1)
In this case x, y and z are broadcast to tensors of shape (N, 1)
and concatenated to a tensor of shape (N, 3)
Case III: Singleton (only if allow_singleton=True)
In this case y and z are None, and x can be one of the following:
- Python scalar
- Torch scalar
- Torch tensor of shape (N, 1) or (1, 1)
Here x will be duplicated 3 times, and we return a tensor of shape (N, 3)
Returns:
xyz: Tensor of shape (N, 3)
"""
device_ = get_device(x, device)
# If x is actually a tensor of shape (N, 3) then just return it
if torch.is_tensor(x) and x.dim() == 2:
if x.shape[1] != 3:
msg = "Expected tensor of shape (N, 3); got %r (in %s)"
raise ValueError(msg % (x.shape, name))
if y is not None or z is not None:
msg = "Expected y and z to be None (in %s)" % name
raise ValueError(msg)
return x.to(device=device_, dtype=dtype)
if allow_singleton and y is None and z is None:
y = x
z = x
# Convert all to 1D tensors
xyz = [_handle_coord(c, dtype, device_) for c in [x, y, z]]
# Broadcast and concatenate
sizes = [c.shape[0] for c in xyz]
N = max(sizes)
for c in xyz:
if c.shape[0] != 1 and c.shape[0] != N:
msg = "Got non-broadcastable sizes %r (in %s)" % (sizes, name)
raise ValueError(msg)
xyz = [c.expand(N) for c in xyz]
xyz = torch.stack(xyz, dim=1)
return xyz
def _handle_angle_input(
x, dtype: torch.dtype, device: Optional[Device], name: str
) -> torch.Tensor:
"""
Helper function for building a rotation function using angles.
The output is always of shape (N,).
The input can be one of:
- Torch tensor of shape (N,)
- Python scalar
- Torch scalar
"""
device_ = get_device(x, device)
if torch.is_tensor(x) and x.dim() > 1:
msg = "Expected tensor of shape (N,); got %r (in %s)"
raise ValueError(msg % (x.shape, name))
else:
return _handle_coord(x, dtype, device_)
def _broadcast_bmm(a, b) -> torch.Tensor:
"""
Batch multiply two matrices and broadcast if necessary.
Args:
a: torch tensor of shape (P, K) or (M, P, K)
b: torch tensor of shape (N, K, K)
Returns:
a and b broadcast multiplied. The output batch dimension is max(N, M).
To broadcast transforms across a batch dimension if M != N then
expect that either M = 1 or N = 1. The tensor with batch dimension 1 is
expanded to have shape N or M.
"""
if a.dim() == 2:
a = a[None]
if len(a) != len(b):
if not ((len(a) == 1) or (len(b) == 1)):
msg = "Expected batch dim for bmm to be equal or 1; got %r, %r"
raise ValueError(msg % (a.shape, b.shape))
if len(a) == 1:
a = a.expand(len(b), -1, -1)
if len(b) == 1:
b = b.expand(len(a), -1, -1)
return a.bmm(b)
@torch.no_grad()
def _check_valid_rotation_matrix(R, tol: float = 1e-7) -> None:
"""
Determine if R is a valid rotation matrix by checking it satisfies the
following conditions:
``RR^T = I and det(R) = 1``
Args:
R: an (N, 3, 3) matrix
Returns:
None
Emits a warning if R is an invalid rotation matrix.
"""
N = R.shape[0]
eye = torch.eye(3, dtype=R.dtype, device=R.device)
eye = eye.view(1, 3, 3).expand(N, -1, -1)
orthogonal = torch.allclose(R.bmm(R.transpose(1, 2)), eye, atol=tol)
det_R = _safe_det_3x3(R)
no_distortion = torch.allclose(det_R, torch.ones_like(det_R))
if not (orthogonal and no_distortion):
msg = "R is not a valid rotation matrix"
warnings.warn(msg)
return
# Copyright 2021 Toyota Research Institute. All rights reserved.
import logging
from functools import wraps
import torch.distributed as dist
from detectron2.utils import comm as d2_comm
LOG = logging.getLogger(__name__)
_NESTED_BROADCAST_FROM_MASTER = False
def is_distributed():
return d2_comm.get_world_size() > 1
def broadcast_from_master(fn):
"""If distributed, only the master executes the function and broadcast the results to other workers.
Usage:
@broadcast_from_master
def foo(a, b): ...
"""
@wraps(fn)
def wrapper(*args, **kwargs): # pylint: disable=unused-argument
global _NESTED_BROADCAST_FROM_MASTER
if not is_distributed():
return fn(*args, **kwargs)
if _NESTED_BROADCAST_FROM_MASTER:
assert d2_comm.is_main_process()
LOG.warning(f"_NESTED_BROADCAST_FROM_MASTER = True, {fn.__name__}")
return fn(*args, **kwargs)
if d2_comm.is_main_process():
_NESTED_BROADCAST_FROM_MASTER = True
ret = [fn(*args, **kwargs), ]
_NESTED_BROADCAST_FROM_MASTER = False
else:
ret = [None, ]
if dist.is_initialized():
dist.broadcast_object_list(ret)
ret = ret[0]
assert ret is not None
return ret
return wrapper
def master_only(fn):
"""If distributed, only the master executes the function.
Usage:
@master_only
def foo(a, b): ...
"""
@wraps(fn)
def wrapped_fn(*args, **kwargs):
if d2_comm.is_main_process():
ret = fn(*args, **kwargs)
d2_comm.synchronize()
if d2_comm.is_main_process():
return ret
return wrapped_fn
def gather_dict(dikt):
"""Gather python dictionaries from all workers to the rank=0 worker.
Assumption: the keys of `dikt` are disjoint across all workers.
If rank = 0, then returned aggregated dict.
If rank > 0, then return `None`.
"""
dict_lst = d2_comm.gather(dikt, dst=0)
if d2_comm.is_main_process():
gathered_dict = {}
for dic in dict_lst:
for k in dic.keys():
assert k not in gathered_dict, f"Dictionary key overlaps: {k}"
gathered_dict.update(dic)
return gathered_dict
else:
return None
def reduce_sum(tensor):
"""
Adapted from AdelaiDet:
https://github.com/aim-uofa/AdelaiDet/blob/master/adet/utils/comm.py
"""
if not is_distributed():
return tensor
tensor = tensor.clone()
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
return tensor
# Copyright 2021 Toyota Research Institute. All rights reserved.
import logging
import cv2
import numpy as np
import torch
import torch.nn.functional as F
LOG = logging.getLogger(__name__)
PI = 3.14159265358979323846
EPS = 1e-7
def _sqrt_positive_part(x: torch.Tensor) -> torch.Tensor:
"""
Returns torch.sqrt(torch.max(0, x))
but with a zero subgradient where x is 0.
"""
ret = torch.zeros_like(x)
positive_mask = x > 0
ret[positive_mask] = torch.sqrt(x[positive_mask])
return ret
def matrix_to_quaternion(matrix: torch.Tensor) -> torch.Tensor:
"""
Convert rotations given as rotation matrices to quaternions.
Args:
matrix: Rotation matrices as tensor of shape (..., 3, 3).
Returns:
quaternions with real part first, as tensor of shape (..., 4).
"""
if matrix.size(-1) != 3 or matrix.size(-2) != 3:
raise ValueError(f"Invalid rotation matrix shape {matrix.shape}.")
batch_dim = matrix.shape[:-2]
m00, m01, m02, m10, m11, m12, m20, m21, m22 = torch.unbind(
matrix.reshape(batch_dim + (9,)), dim=-1
)
q_abs = _sqrt_positive_part(
torch.stack(
[
1.0 + m00 + m11 + m22,
1.0 + m00 - m11 - m22,
1.0 - m00 + m11 - m22,
1.0 - m00 - m11 + m22,
],
dim=-1,
)
)
# we produce the desired quaternion multiplied by each of r, i, j, k
quat_by_rijk = torch.stack(
[
torch.stack([q_abs[..., 0] ** 2, m21 - m12, m02 - m20, m10 - m01], dim=-1),
torch.stack([m21 - m12, q_abs[..., 1] ** 2, m10 + m01, m02 + m20], dim=-1),
torch.stack([m02 - m20, m10 + m01, q_abs[..., 2] ** 2, m12 + m21], dim=-1),
torch.stack([m10 - m01, m20 + m02, m21 + m12, q_abs[..., 3] ** 2], dim=-1),
],
dim=-2,
)
# We floor here at 0.1 but the exact level is not important; if q_abs is small,
# the candidate won't be picked.
flr = torch.tensor(0.1).to(dtype=q_abs.dtype, device=q_abs.device)
quat_candidates = quat_by_rijk / (2.0 * q_abs[..., None].max(flr))
# if not for numerical problems, quat_candidates[i] should be same (up to a sign),
# forall i; we pick the best-conditioned one (with the largest denominator)
return quat_candidates[
F.one_hot(q_abs.argmax(dim=-1), num_classes=4) > 0.5, : # pyre-ignore[16]
].reshape(batch_dim + (4,))
def quaternion_to_matrix(quaternions: torch.Tensor) -> torch.Tensor:
"""
Convert rotations given as quaternions to rotation matrices.
Args:
quaternions: quaternions with real part first,
as tensor of shape (..., 4).
Returns:
Rotation matrices as tensor of shape (..., 3, 3).
"""
r, i, j, k = torch.unbind(quaternions, -1)
two_s = 2.0 / (quaternions * quaternions).sum(-1)
o = torch.stack(
(
1 - two_s * (j * j + k * k),
two_s * (i * j - k * r),
two_s * (i * k + j * r),
two_s * (i * j + k * r),
1 - two_s * (i * i + k * k),
two_s * (j * k - i * r),
two_s * (i * k - j * r),
two_s * (j * k + i * r),
1 - two_s * (i * i + j * j),
),
-1,
)
return o.reshape(quaternions.shape[:-1] + (3, 3))
def allocentric_to_egocentric(quat, proj_ctr, inv_intrinsics):
"""
Parameters
----------
quat: Tensor
(N, 4). Batch of (allocentric) quaternions.
proj_ctr: Tensor
(N, 2). Projected centers. xy coordninates.
inv_intrinsics: [type]
(N, 3, 3). Inverted intrinsics.
"""
R_obj_to_local = quaternion_to_matrix(quat)
# ray == z-axis in local orientaion
ray = unproject_points2d(proj_ctr, inv_intrinsics)
z = ray / ray.norm(dim=1, keepdim=True)
# gram-schmit process: local_y = global_y - global_y \dot local_z
y = z.new_tensor([[0., 1., 0.]]) - z[:, 1:2] * z
y = y / y.norm(dim=1, keepdim=True)
x = torch.cross(y, z, dim=1)
# local -> global
R_local_to_global = torch.stack([x, y, z], dim=-1)
# obj -> global
R_obj_to_global = torch.bmm(R_local_to_global, R_obj_to_local)
egocentric_quat = matrix_to_quaternion(R_obj_to_global)
# Make sure it's unit norm.
quat_norm = egocentric_quat.norm(dim=1, keepdim=True)
if not torch.allclose(quat_norm, torch.as_tensor(1.), atol=1e-3):
LOG.warning(
f"Some of the input quaternions are not unit norm: min={quat_norm.min()}, max={quat_norm.max()}; therefore normalizing."
)
egocentric_quat = egocentric_quat / quat_norm.clamp(min=EPS)
return egocentric_quat
def homogenize_points(xy):
"""
Parameters
----------
xy: Tensor
xy coordinates. shape=(N, ..., 2)
E.g., (N, 2) or (N, K, 2) or (N, H, W, 2)
Returns
-------
Tensor:
1. is appended to the last dimension. shape=(N, ..., 3)
E.g, (N, 3) or (N, K, 3) or (N, H, W, 3).
"""
# NOTE: this seems to work for arbitrary number of dimensions of input
pad = torch.nn.ConstantPad1d(padding=(0, 1), value=1.)
return pad(xy)
def project_points3d(Xw, K):
_, C = Xw.shape
assert C == 3
uv, _ = cv2.projectPoints(
Xw, np.zeros((3, 1), dtype=np.float32), np.zeros(3, dtype=np.float32), K, np.zeros(5, dtype=np.float32)
)
return uv.reshape(-1, 2)
def unproject_points2d(points2d, inv_K, scale=1.0):
"""
Parameters
----------
points2d: Tensor
xy coordinates. shape=(N, ..., 2)
E.g., (N, 2) or (N, K, 2) or (N, H, W, 2)
inv_K: Tensor
Inverted intrinsics; shape=(N, 3, 3)
scale: float, default: 1.0
Scaling factor.
Returns
-------
Tensor:
Unprojected 3D point. shape=(N, ..., 3)
E.g., (N, 3) or (N, K, 3) or (N, H, W, 3)
"""
points2d = homogenize_points(points2d)
siz = points2d.size()
points2d = points2d.view(-1, 3).unsqueeze(-1) # (N, 3, 1)
unprojected = torch.matmul(inv_K, points2d) # (N, 3, 3) x (N, 3, 1) -> (N, 3, 1)
unprojected = unprojected.view(siz)
return unprojected * scale
# Copyright 2021 Toyota Research Institute. All rights reserved.
from collections import OrderedDict
# from detectron2.config import configurable
class Task():
def __init__(self, name, is_detection_task, is_dense_prediction_task):
self.name = name
self.is_detection_task = is_detection_task
self.is_dense_prediction_task = is_dense_prediction_task
# yapf: disable
TASKS = [
Task(
name="box2d",
is_detection_task=True,
is_dense_prediction_task=False,
),
Task(
name="box3d",
is_detection_task=True,
is_dense_prediction_task=False,
),
Task(
name="depth",
is_detection_task=False,
is_dense_prediction_task=True,
)
]
# yapf: enable
NAME_TO_TASK = OrderedDict([(task.name, task) for task in TASKS])
class TaskManager():
#@configurable
def __init__(self, box2d_on=False, box3d_on=False, depth_on=False):
"""
configurable is experimental.
"""
self._box2d_on = self._mask2d_on = self._box3d_on = self._semseg2d_on = self._depth_on = False
tasks = []
if box2d_on:
tasks.append(NAME_TO_TASK['box2d'])
self._box2d_on = True
if box3d_on:
tasks.append(NAME_TO_TASK['box3d'])
self._box3d_on = True
if depth_on:
tasks.append(NAME_TO_TASK['depth'])
self._depth_on = True
if not tasks:
raise ValueError("No task specified.")
self._tasks = tasks
@property
def tasks(self):
return self._tasks
'''@classmethod
def from_config(cls, cfg):
# yapf: disable
return OrderedDict(
box2d_on = cfg.MODEL.BOX2D_ON,
box3d_on = cfg.MODEL.BOX3D_ON,
depth_on = cfg.MODEL.DEPTH_ON,
)
# yapf: enable'''
# Indicators that tells if each task is enabled.
@property
def box2d_on(self):
return self._box2d_on
@property
def box3d_on(self):
return self._box3d_on
@property
def depth_on(self):
return self._depth_on
@property
def has_dense_prediction_task(self):
return any([task.is_dense_prediction_task for task in self.tasks])
@property
def has_detection_task(self):
return any([task.is_detection_task for task in self.tasks])
@property
def task_names(self):
return [task.name for task in self.tasks]
# Copyright 2021 Toyota Research Institute. All rights reserved.
import torch
import torch.nn.functional as F
def compute_features_locations(h, w, stride, dtype=torch.float32, device='cpu', offset="none"):
"""Adapted from AdelaiDet:
https://github.com/aim-uofa/AdelaiDet/blob/master/adet/utils/comm.py
Key differnece: offset is configurable.
"""
shifts_x = torch.arange(0, w * stride, step=stride, dtype=dtype, device=device)
shifts_y = torch.arange(0, h * stride, step=stride, dtype=dtype, device=device)
shift_y, shift_x = torch.meshgrid(shifts_y, shifts_x)
shift_x = shift_x.reshape(-1)
shift_y = shift_y.reshape(-1)
# (dennis.park)
# locations = torch.stack((shift_x, shift_y), dim=1) + stride // 2
locations = torch.stack((shift_x, shift_y), dim=1)
if offset == "half":
locations += stride // 2
else:
assert offset == "none"
return locations
def aligned_bilinear(tensor, factor, offset="none"):
"""Adapted from AdelaiDet:
https://github.com/aim-uofa/AdelaiDet/blob/master/adet/utils/comm.py
"""
assert tensor.dim() == 4
assert factor >= 1
assert int(factor) == factor
if factor == 1:
return tensor
h, w = tensor.size()[2:]
tensor = F.pad(tensor, pad=(0, 1, 0, 1), mode="replicate")
oh = factor * h + 1
ow = factor * w + 1
tensor = F.interpolate(tensor, size=(oh, ow), mode='bilinear', align_corners=True)
if offset == "half":
tensor = F.pad(tensor, pad=(factor // 2, 0, factor // 2, 0), mode="replicate")
return tensor[:, :, :oh - 1, :ow - 1]
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
# Copyright 2021 Toyota Research Institute. All rights reserved.
import colorsys
import os
import cv2
import matplotlib.colors as mplc
import numpy as np
from PIL import Image, ImageDraw
def fill_color_polygon(image, polygon, color, alpha=0.5):
"""Color interior of polygon with alpha-blending. This function modified input in place.
"""
_mask = Image.new('L', (image.shape[1], image.shape[0]), 0)
ImageDraw.Draw(_mask).polygon(polygon, outline=1, fill=1)
mask = np.array(_mask, np.bool)
for c in range(3):
channel = image[:, :, c]
channel[mask] = channel[mask] * (1. - alpha) + color[c] * alpha
def change_color_brightness(color, brightness_factor):
"""
Copied from detectron2.utils.visualizer.py
-------------------------------------------
Depending on the brightness_factor, gives a lighter or darker color i.e. a color with
less or more saturation than the original color.
Args:
color: color of the polygon. Refer to `matplotlib.colors` for a full list of
formats that are accepted.
brightness_factor (float): a value in [-1.0, 1.0] range. A lightness factor of
0 will correspond to no change, a factor in [-1.0, 0) range will result in
a darker color and a factor in (0, 1.0] range will result in a lighter color.
Returns:
modified_color (tuple[double]): a tuple containing the RGB values of the
modified color. Each value in the tuple is in the [0.0, 1.0] range.
"""
assert brightness_factor >= -1.0 and brightness_factor <= 1.0
color = mplc.to_rgb(color)
polygon_color = colorsys.rgb_to_hls(*mplc.to_rgb(color))
modified_lightness = polygon_color[1] + (brightness_factor * polygon_color[1])
modified_lightness = 0.0 if modified_lightness < 0.0 else modified_lightness
modified_lightness = 1.0 if modified_lightness > 1.0 else modified_lightness
modified_color = colorsys.hls_to_rgb(polygon_color[0], modified_lightness, polygon_color[2])
return modified_color
def draw_text(ax, text, position, *, font_size, color="g", horizontal_alignment="center", rotation=0):
"""
Copied from Visualizer.draw_text()
-----------------------------------
Args:
text (str): class label
position (tuple): a tuple of the x and y coordinates to place text on image.
font_size (int, optional): font of the text. If not provided, a font size
proportional to the image width is calculated and used.
color: color of the text. Refer to `matplotlib.colors` for full list
of formats that are accepted.
horizontal_alignment (str): see `matplotlib.text.Text`
rotation: rotation angle in degrees CCW
Returns:
output (VisImage): image object with text drawn.
"""
# since the text background is dark, we don't want the text to be dark
color = np.maximum(list(mplc.to_rgb(color)), 0.2)
color[np.argmax(color)] = max(0.8, np.max(color))
x, y = position
ax.text(
x,
y,
text,
size=font_size,
family="sans-serif",
bbox={
"facecolor": "black",
"alpha": 0.8,
"pad": 0.7,
"edgecolor": "none"
},
verticalalignment="top",
horizontalalignment=horizontal_alignment,
color=color,
zorder=10,
rotation=rotation,
)
return ax
def float_to_uint8_color(float_clr):
assert all([c >= 0. for c in float_clr])
assert all([c <= 1. for c in float_clr])
return [int(c * 255.) for c in float_clr]
def mosaic(items, scale=1.0, pad=3, grid_width=None):
"""Creates a mosaic from list of images.
Parameters
----------
items: list of np.ndarray
List of images to mosaic.
scale: float, default=1.0
Scale factor applied to images. scale > 1.0 enlarges images.
pad: int, default=3
Padding size of the images before mosaic
grid_width: int, default=None
Mosaic width or grid width of the mosaic
Returns
-------
image: np.array of shape (H, W, 3)
Image mosaic
"""
# Determine tile width and height
N = len(items)
assert N > 0, 'No items to mosaic!'
grid_width = grid_width if grid_width else np.ceil(np.sqrt(N)).astype(int)
grid_height = np.ceil(N * 1. / grid_width).astype(np.int)
input_size = items[0].shape[:2]
target_shape = (int(input_size[1] * scale), int(input_size[0] * scale))
mosaic_items = []
for j in range(grid_width * grid_height):
if j < N:
# Only the first image is scaled, the rest are re-shaped
# to the same size as the previous image in the mosaic
im = cv2.resize(items[j], dsize=target_shape)
mosaic_items.append(im)
else:
mosaic_items.append(np.zeros_like(mosaic_items[-1]))
# Stack W tiles horizontally first, then vertically
im_pad = lambda im: cv2.copyMakeBorder(im, pad, pad, pad, pad, cv2.BORDER_CONSTANT, 0)
mosaic_items = [im_pad(im) for im in mosaic_items]
hstack = [np.hstack(mosaic_items[j:j + grid_width]) for j in range(0, len(mosaic_items), grid_width)]
mosaic_viz = np.vstack(hstack) if len(hstack) > 1 \
else hstack[0]
return mosaic_viz
from .vovnet import VoVNet
__all__ = ['VoVNet']
\ No newline at end of file
from collections import OrderedDict
from mmcv.runner import BaseModule
from mmdet.models.builder import BACKBONES
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.modules.batchnorm import _BatchNorm
VoVNet19_slim_dw_eSE = {
'stem': [64, 64, 64],
'stage_conv_ch': [64, 80, 96, 112],
'stage_out_ch': [112, 256, 384, 512],
"layer_per_block": 3,
"block_per_stage": [1, 1, 1, 1],
"eSE": True,
"dw": True
}
VoVNet19_dw_eSE = {
'stem': [64, 64, 64],
"stage_conv_ch": [128, 160, 192, 224],
"stage_out_ch": [256, 512, 768, 1024],
"layer_per_block": 3,
"block_per_stage": [1, 1, 1, 1],
"eSE": True,
"dw": True
}
VoVNet19_slim_eSE = {
'stem': [64, 64, 128],
'stage_conv_ch': [64, 80, 96, 112],
'stage_out_ch': [112, 256, 384, 512],
'layer_per_block': 3,
'block_per_stage': [1, 1, 1, 1],
'eSE': True,
"dw": False
}
VoVNet19_eSE = {
'stem': [64, 64, 128],
"stage_conv_ch": [128, 160, 192, 224],
"stage_out_ch": [256, 512, 768, 1024],
"layer_per_block": 3,
"block_per_stage": [1, 1, 1, 1],
"eSE": True,
"dw": False
}
VoVNet39_eSE = {
'stem': [64, 64, 128],
"stage_conv_ch": [128, 160, 192, 224],
"stage_out_ch": [256, 512, 768, 1024],
"layer_per_block": 5,
"block_per_stage": [1, 1, 2, 2],
"eSE": True,
"dw": False
}
VoVNet57_eSE = {
'stem': [64, 64, 128],
"stage_conv_ch": [128, 160, 192, 224],
"stage_out_ch": [256, 512, 768, 1024],
"layer_per_block": 5,
"block_per_stage": [1, 1, 4, 3],
"eSE": True,
"dw": False
}
VoVNet99_eSE = {
'stem': [64, 64, 128],
"stage_conv_ch": [128, 160, 192, 224],
"stage_out_ch": [256, 512, 768, 1024],
"layer_per_block": 5,
"block_per_stage": [1, 3, 9, 3],
"eSE": True,
"dw": False
}
_STAGE_SPECS = {
"V-19-slim-dw-eSE": VoVNet19_slim_dw_eSE,
"V-19-dw-eSE": VoVNet19_dw_eSE,
"V-19-slim-eSE": VoVNet19_slim_eSE,
"V-19-eSE": VoVNet19_eSE,
"V-39-eSE": VoVNet39_eSE,
"V-57-eSE": VoVNet57_eSE,
"V-99-eSE": VoVNet99_eSE,
}
def dw_conv3x3(in_channels, out_channels, module_name, postfix, stride=1, kernel_size=3, padding=1):
"""3x3 convolution with padding"""
return [
(
'{}_{}/dw_conv3x3'.format(module_name, postfix),
nn.Conv2d(
in_channels,
out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
groups=out_channels,
bias=False
)
),
(
'{}_{}/pw_conv1x1'.format(module_name, postfix),
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, groups=1, bias=False)
),
('{}_{}/pw_norm'.format(module_name, postfix), nn.BatchNorm2d(out_channels)),
('{}_{}/pw_relu'.format(module_name, postfix), nn.ReLU(inplace=True)),
]
def conv3x3(in_channels, out_channels, module_name, postfix, stride=1, groups=1, kernel_size=3, padding=1):
"""3x3 convolution with padding"""
return [
(
f"{module_name}_{postfix}/conv",
nn.Conv2d(
in_channels,
out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
groups=groups,
bias=False,
),
),
(f"{module_name}_{postfix}/norm", nn.BatchNorm2d(out_channels)),
(f"{module_name}_{postfix}/relu", nn.ReLU(inplace=True)),
]
def conv1x1(in_channels, out_channels, module_name, postfix, stride=1, groups=1, kernel_size=1, padding=0):
"""1x1 convolution with padding"""
return [
(
f"{module_name}_{postfix}/conv",
nn.Conv2d(
in_channels,
out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
groups=groups,
bias=False,
),
),
(f"{module_name}_{postfix}/norm", nn.BatchNorm2d(out_channels)),
(f"{module_name}_{postfix}/relu", nn.ReLU(inplace=True)),
]
class Hsigmoid(nn.Module):
def __init__(self, inplace=True):
super(Hsigmoid, self).__init__()
self.inplace = inplace
def forward(self, x):
return F.relu6(x + 3.0, inplace=self.inplace) / 6.0
class eSEModule(nn.Module):
def __init__(self, channel, reduction=4):
super(eSEModule, self).__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.fc = nn.Conv2d(channel, channel, kernel_size=1, padding=0)
self.hsigmoid = Hsigmoid()
def forward(self, x):
input = x
x = self.avg_pool(x)
x = self.fc(x)
x = self.hsigmoid(x)
return input * x
class _OSA_module(nn.Module):
def __init__(
self, in_ch, stage_ch, concat_ch, layer_per_block, module_name, SE=False, identity=False, depthwise=False
):
super(_OSA_module, self).__init__()
self.identity = identity
self.depthwise = depthwise
self.isReduced = False
self.layers = nn.ModuleList()
in_channel = in_ch
if self.depthwise and in_channel != stage_ch:
self.isReduced = True
self.conv_reduction = nn.Sequential(
OrderedDict(conv1x1(in_channel, stage_ch, "{}_reduction".format(module_name), "0"))
)
for i in range(layer_per_block):
if self.depthwise:
self.layers.append(nn.Sequential(OrderedDict(dw_conv3x3(stage_ch, stage_ch, module_name, i))))
else:
self.layers.append(nn.Sequential(OrderedDict(conv3x3(in_channel, stage_ch, module_name, i))))
in_channel = stage_ch
# feature aggregation
in_channel = in_ch + layer_per_block * stage_ch
self.concat = nn.Sequential(OrderedDict(conv1x1(in_channel, concat_ch, module_name, "concat")))
self.ese = eSEModule(concat_ch)
def forward(self, x):
identity_feat = x
output = []
output.append(x)
if self.depthwise and self.isReduced:
x = self.conv_reduction(x)
for layer in self.layers:
x = layer(x)
output.append(x)
x = torch.cat(output, dim=1)
xt = self.concat(x)
xt = self.ese(xt)
if self.identity:
xt = xt + identity_feat
return xt
class _OSA_stage(nn.Sequential):
def __init__(
self, in_ch, stage_ch, concat_ch, block_per_stage, layer_per_block, stage_num, SE=False, depthwise=False
):
super(_OSA_stage, self).__init__()
if not stage_num == 2:
self.add_module("Pooling", nn.MaxPool2d(kernel_size=3, stride=2, ceil_mode=True))
if block_per_stage != 1:
SE = False
module_name = f"OSA{stage_num}_1"
self.add_module(
module_name, _OSA_module(in_ch, stage_ch, concat_ch, layer_per_block, module_name, SE, depthwise=depthwise)
)
for i in range(block_per_stage - 1):
if i != block_per_stage - 2: # last block
SE = False
module_name = f"OSA{stage_num}_{i + 2}"
self.add_module(
module_name,
_OSA_module(
concat_ch,
stage_ch,
concat_ch,
layer_per_block,
module_name,
SE,
identity=True,
depthwise=depthwise
),
)
@BACKBONES.register_module()
class VoVNet(BaseModule):
def __init__(self, spec_name, input_ch=3, out_features=None,
frozen_stages=-1, norm_eval=True, pretrained=None, init_cfg=None):
"""
Args:
input_ch(int) : the number of input channel
out_features (list[str]): name of the layers whose outputs should
be returned in forward. Can be anything in "stem", "stage2" ...
"""
super(VoVNet, self).__init__(init_cfg)
self.frozen_stages = frozen_stages
self.norm_eval = norm_eval
if isinstance(pretrained, str):
warnings.warn('DeprecationWarning: pretrained is deprecated, '
'please use "init_cfg" instead')
self.init_cfg = dict(type='Pretrained', checkpoint=pretrained)
stage_specs = _STAGE_SPECS[spec_name]
stem_ch = stage_specs["stem"]
config_stage_ch = stage_specs["stage_conv_ch"]
config_concat_ch = stage_specs["stage_out_ch"]
block_per_stage = stage_specs["block_per_stage"]
layer_per_block = stage_specs["layer_per_block"]
SE = stage_specs["eSE"]
depthwise = stage_specs["dw"]
self._out_features = out_features
# Stem module
conv_type = dw_conv3x3 if depthwise else conv3x3
stem = conv3x3(input_ch, stem_ch[0], "stem", "1", 2)
stem += conv_type(stem_ch[0], stem_ch[1], "stem", "2", 1)
stem += conv_type(stem_ch[1], stem_ch[2], "stem", "3", 2)
self.add_module("stem", nn.Sequential((OrderedDict(stem))))
current_stirde = 4
self._out_feature_strides = {"stem": current_stirde, "stage2": current_stirde}
self._out_feature_channels = {"stem": stem_ch[2]}
stem_out_ch = [stem_ch[2]]
in_ch_list = stem_out_ch + config_concat_ch[:-1]
# OSA stages
self.stage_names = []
for i in range(4): # num_stages
name = "stage%d" % (i + 2) # stage 2 ... stage 5
self.stage_names.append(name)
self.add_module(
name,
_OSA_stage(
in_ch_list[i],
config_stage_ch[i],
config_concat_ch[i],
block_per_stage[i],
layer_per_block,
i + 2,
SE,
depthwise,
),
)
self._out_feature_channels[name] = config_concat_ch[i]
if not i == 0:
self._out_feature_strides[name] = current_stirde = int(current_stirde * 2)
# initialize weights
# self._initialize_weights()
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight)
def forward(self, x):
outputs = {}
x = self.stem(x)
if "stem" in self._out_features:
outputs["stem"] = x
for name in self.stage_names:
x = getattr(self, name)(x)
if name in self._out_features:
outputs[name] = x
return outputs
def _freeze_stages(self):
if self.frozen_stages >= 0:
m = getattr(self, 'stem')
m.eval()
for param in m.parameters():
param.requires_grad = False
for i in range(1, self.frozen_stages + 1):
m = getattr(self, f'stage{i+1}')
m.eval()
for param in m.parameters():
param.requires_grad = False
def train(self, mode=True):
"""Convert the model into training mode while keep normalization layer
freezed."""
super(VoVNet, self).train(mode)
self._freeze_stages()
if mode and self.norm_eval:
for m in self.modules():
# trick: eval have effect on BatchNorm only
if isinstance(m, _BatchNorm):
m.eval()
\ No newline at end of file
from .hooks import GradChecker
\ No newline at end of file
from mmcv.runner.hooks.hook import HOOKS, Hook
from projects.mmdet3d_plugin.models.utils import run_time
@HOOKS.register_module()
class GradChecker(Hook):
def after_train_iter(self, runner):
for key, val in runner.model.named_parameters():
if val.grad == None and val.requires_grad:
print('WARNNING: {key}\'s parameters are not be used!!!!'.format(key=key))
from .adamw import AdamW2
\ No newline at end of file
try:
from torch.optim import _functional as F
except:
print('WARNING!!!, I recommend using torch>=1.8')
import torch
from torch.optim.optimizer import Optimizer
from mmcv.runner.optimizer.builder import OPTIMIZERS
@OPTIMIZERS.register_module()
class AdamW2(Optimizer):
r"""Implements AdamW algorithm. Solve the bug of torch 1.8
The original Adam algorithm was proposed in `Adam: A Method for Stochastic Optimization`_.
The AdamW variant was proposed in `Decoupled Weight Decay Regularization`_.
Args:
params (iterable): iterable of parameters to optimize or dicts defining
parameter groups
lr (float, optional): learning rate (default: 1e-3)
betas (Tuple[float, float], optional): coefficients used for computing
running averages of gradient and its square (default: (0.9, 0.999))
eps (float, optional): term added to the denominator to improve
numerical stability (default: 1e-8)
weight_decay (float, optional): weight decay coefficient (default: 1e-2)
amsgrad (boolean, optional): whether to use the AMSGrad variant of this
algorithm from the paper `On the Convergence of Adam and Beyond`_
(default: False)
.. _Adam\: A Method for Stochastic Optimization:
https://arxiv.org/abs/1412.6980
.. _Decoupled Weight Decay Regularization:
https://arxiv.org/abs/1711.05101
.. _On the Convergence of Adam and Beyond:
https://openreview.net/forum?id=ryQu7f-RZ
"""
def __init__(self, params, lr=1e-3, betas=(0.9, 0.999), eps=1e-8,
weight_decay=1e-2, amsgrad=False):
if not 0.0 <= lr:
raise ValueError("Invalid learning rate: {}".format(lr))
if not 0.0 <= eps:
raise ValueError("Invalid epsilon value: {}".format(eps))
if not 0.0 <= betas[0] < 1.0:
raise ValueError("Invalid beta parameter at index 0: {}".format(betas[0]))
if not 0.0 <= betas[1] < 1.0:
raise ValueError("Invalid beta parameter at index 1: {}".format(betas[1]))
if not 0.0 <= weight_decay:
raise ValueError("Invalid weight_decay value: {}".format(weight_decay))
defaults = dict(lr=lr, betas=betas, eps=eps,
weight_decay=weight_decay, amsgrad=amsgrad)
super(AdamW2, self).__init__(params, defaults)
def __setstate__(self, state):
super(AdamW2, self).__setstate__(state)
for group in self.param_groups:
group.setdefault('amsgrad', False)
@torch.no_grad()
def step(self, closure=None):
"""Performs a single optimization step.
Args:
closure (callable, optional): A closure that reevaluates the model
and returns the loss.
"""
loss = None
if closure is not None:
with torch.enable_grad():
loss = closure()
for group in self.param_groups:
params_with_grad = []
grads = []
exp_avgs = []
exp_avg_sqs = []
state_sums = []
max_exp_avg_sqs = []
state_steps = []
amsgrad = group['amsgrad']
# put this line here for solving bug
beta1, beta2 = group['betas']
for p in group['params']:
if p.grad is None:
continue
params_with_grad.append(p)
if p.grad.is_sparse:
raise RuntimeError('AdamW does not support sparse gradients')
grads.append(p.grad)
state = self.state[p]
# State initialization
if len(state) == 0:
state['step'] = 0
# Exponential moving average of gradient values
state['exp_avg'] = torch.zeros_like(p, memory_format=torch.preserve_format)
# Exponential moving average of squared gradient values
state['exp_avg_sq'] = torch.zeros_like(p, memory_format=torch.preserve_format)
if amsgrad:
# Maintains max of all exp. moving avg. of sq. grad. values
state['max_exp_avg_sq'] = torch.zeros_like(p, memory_format=torch.preserve_format)
exp_avgs.append(state['exp_avg'])
exp_avg_sqs.append(state['exp_avg_sq'])
if amsgrad:
max_exp_avg_sqs.append(state['max_exp_avg_sq'])
# update the steps for each param group update
state['step'] += 1
# record the step after step update
state_steps.append(state['step'])
F.adamw(params_with_grad,
grads,
exp_avgs,
exp_avg_sqs,
max_exp_avg_sqs,
state_steps,
amsgrad,
beta1,
beta2,
group['lr'],
group['weight_decay'],
group['eps'])
return loss
\ No newline at end of file
from .bricks import run_time
from .grid_mask import GridMask
from .position_embedding import RelPositionEmbedding
from .visual import save_tensor
\ No newline at end of file
import functools
import time
from collections import defaultdict
import torch
time_maps = defaultdict(lambda :0.)
count_maps = defaultdict(lambda :0.)
def run_time(name):
def middle(fn):
def wrapper(*args, **kwargs):
torch.cuda.synchronize()
start = time.time()
res = fn(*args, **kwargs)
torch.cuda.synchronize()
time_maps['%s : %s'%(name, fn.__name__) ] += time.time()-start
count_maps['%s : %s'%(name, fn.__name__) ] +=1
print("%s : %s takes up %f "% (name, fn.__name__,time_maps['%s : %s'%(name, fn.__name__) ] /count_maps['%s : %s'%(name, fn.__name__) ] ))
return res
return wrapper
return middle
\ No newline at end of file
import torch
import torch.nn as nn
import numpy as np
from PIL import Image
from mmcv.runner import force_fp32, auto_fp16
class Grid(object):
def __init__(self, use_h, use_w, rotate = 1, offset=False, ratio = 0.5, mode=0, prob = 1.):
self.use_h = use_h
self.use_w = use_w
self.rotate = rotate
self.offset = offset
self.ratio = ratio
self.mode=mode
self.st_prob = prob
self.prob = prob
def set_prob(self, epoch, max_epoch):
self.prob = self.st_prob * epoch / max_epoch
def __call__(self, img, label):
if np.random.rand() > self.prob:
return img, label
h = img.size(1)
w = img.size(2)
self.d1 = 2
self.d2 = min(h, w)
hh = int(1.5*h)
ww = int(1.5*w)
d = np.random.randint(self.d1, self.d2)
if self.ratio == 1:
self.l = np.random.randint(1, d)
else:
self.l = min(max(int(d*self.ratio+0.5),1),d-1)
mask = np.ones((hh, ww), np.float32)
st_h = np.random.randint(d)
st_w = np.random.randint(d)
if self.use_h:
for i in range(hh//d):
s = d*i + st_h
t = min(s+self.l, hh)
mask[s:t,:] *= 0
if self.use_w:
for i in range(ww//d):
s = d*i + st_w
t = min(s+self.l, ww)
mask[:,s:t] *= 0
r = np.random.randint(self.rotate)
mask = Image.fromarray(np.uint8(mask))
mask = mask.rotate(r)
mask = np.asarray(mask)
mask = mask[(hh-h)//2:(hh-h)//2+h, (ww-w)//2:(ww-w)//2+w]
mask = torch.from_numpy(mask).float()
if self.mode == 1:
mask = 1-mask
mask = mask.expand_as(img)
if self.offset:
offset = torch.from_numpy(2 * (np.random.rand(h,w) - 0.5)).float()
offset = (1 - mask) * offset
img = img * mask + offset
else:
img = img * mask
return img, label
class GridMask(nn.Module):
def __init__(self, use_h, use_w, rotate = 1, offset=False, ratio = 0.5, mode=0, prob = 1.):
super(GridMask, self).__init__()
self.use_h = use_h
self.use_w = use_w
self.rotate = rotate
self.offset = offset
self.ratio = ratio
self.mode = mode
self.st_prob = prob
self.prob = prob
self.fp16_enable = False
def set_prob(self, epoch, max_epoch):
self.prob = self.st_prob * epoch / max_epoch #+ 1.#0.5
@auto_fp16()
def forward(self, x):
if np.random.rand() > self.prob or not self.training:
return x
n,c,h,w = x.size()
x = x.view(-1,h,w)
hh = int(1.5*h)
ww = int(1.5*w)
d = np.random.randint(2, h)
self.l = min(max(int(d*self.ratio+0.5),1),d-1)
mask = np.ones((hh, ww), np.float32)
st_h = np.random.randint(d)
st_w = np.random.randint(d)
if self.use_h:
for i in range(hh//d):
s = d*i + st_h
t = min(s+self.l, hh)
mask[s:t,:] *= 0
if self.use_w:
for i in range(ww//d):
s = d*i + st_w
t = min(s+self.l, ww)
mask[:,s:t] *= 0
r = np.random.randint(self.rotate)
mask = Image.fromarray(np.uint8(mask))
mask = mask.rotate(r)
mask = np.asarray(mask)
mask = mask[(hh-h)//2:(hh-h)//2+h, (ww-w)//2:(ww-w)//2+w]
mask = torch.from_numpy(mask).to(x.dtype).cuda()
if self.mode == 1:
mask = 1-mask
mask = mask.expand_as(x)
if self.offset:
offset = torch.from_numpy(2 * (np.random.rand(h,w) - 0.5)).to(x.dtype).cuda()
x = x * mask + offset * (1 - mask)
else:
x = x * mask
return x.view(n,c,h,w)
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment