Commit 5ed5979f authored by bailuo's avatar bailuo
Browse files

readme

parents
Pipeline #3043 failed with stages
in 0 seconds
from .resnet_fpn import ResNetFPN_8_2, ResNetFPN_16_4
def build_backbone(config):
if config['backbone_type'] == 'ResNetFPN':
if config['resolution'] == (8, 2):
return ResNetFPN_8_2(config['resnetfpn'])
elif config['resolution'] == (16, 4):
return ResNetFPN_16_4(config['resnetfpn'])
else:
raise ValueError(f"LOFTR.BACKBONE_TYPE {config['backbone_type']} not supported.")
import torch.nn as nn
import torch.nn.functional as F
def conv1x1(in_planes, out_planes, stride=1):
"""1x1 convolution without padding"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, padding=0, bias=False)
def conv3x3(in_planes, out_planes, stride=1):
"""3x3 convolution with padding"""
return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)
class BasicBlock(nn.Module):
def __init__(self, in_planes, planes, stride=1):
super().__init__()
self.conv1 = conv3x3(in_planes, planes, stride)
self.conv2 = conv3x3(planes, planes)
self.bn1 = nn.BatchNorm2d(planes)
self.bn2 = nn.BatchNorm2d(planes)
self.relu = nn.ReLU(inplace=True)
if stride == 1:
self.downsample = None
else:
self.downsample = nn.Sequential(
conv1x1(in_planes, planes, stride=stride),
nn.BatchNorm2d(planes)
)
def forward(self, x):
y = x
y = self.relu(self.bn1(self.conv1(y)))
y = self.bn2(self.conv2(y))
if self.downsample is not None:
x = self.downsample(x)
return self.relu(x+y)
class ResNetFPN_8_2(nn.Module):
"""
ResNet+FPN, output resolution are 1/8 and 1/2.
Each block has 2 layers.
"""
def __init__(self, config):
super().__init__()
# Config
block = BasicBlock
initial_dim = config['initial_dim']
block_dims = config['block_dims']
# Class Variable
self.in_planes = initial_dim
# Networks
self.conv1 = nn.Conv2d(1, initial_dim, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = nn.BatchNorm2d(initial_dim)
self.relu = nn.ReLU(inplace=True)
self.layer1 = self._make_layer(block, block_dims[0], stride=1) # 1/2
self.layer2 = self._make_layer(block, block_dims[1], stride=2) # 1/4
self.layer3 = self._make_layer(block, block_dims[2], stride=2) # 1/8
# 3. FPN upsample
self.layer3_outconv = conv1x1(block_dims[2], block_dims[2])
self.layer2_outconv = conv1x1(block_dims[1], block_dims[2])
self.layer2_outconv2 = nn.Sequential(
conv3x3(block_dims[2], block_dims[2]),
nn.BatchNorm2d(block_dims[2]),
nn.LeakyReLU(),
conv3x3(block_dims[2], block_dims[1]),
)
self.layer1_outconv = conv1x1(block_dims[0], block_dims[1])
self.layer1_outconv2 = nn.Sequential(
conv3x3(block_dims[1], block_dims[1]),
nn.BatchNorm2d(block_dims[1]),
nn.LeakyReLU(),
conv3x3(block_dims[1], block_dims[0]),
)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
def _make_layer(self, block, dim, stride=1):
layer1 = block(self.in_planes, dim, stride=stride)
layer2 = block(dim, dim, stride=1)
layers = (layer1, layer2)
self.in_planes = dim
return nn.Sequential(*layers)
def forward(self, x):
# ResNet Backbone
x0 = self.relu(self.bn1(self.conv1(x)))
x1 = self.layer1(x0) # 1/2
x2 = self.layer2(x1) # 1/4
x3 = self.layer3(x2) # 1/8
# FPN
x3_out = self.layer3_outconv(x3)
x3_out_2x = F.interpolate(x3_out, scale_factor=2., mode='bilinear', align_corners=True)
x2_out = self.layer2_outconv(x2)
x2_out = self.layer2_outconv2(x2_out+x3_out_2x)
x2_out_2x = F.interpolate(x2_out, scale_factor=2., mode='bilinear', align_corners=True)
x1_out = self.layer1_outconv(x1)
x1_out = self.layer1_outconv2(x1_out+x2_out_2x)
return [x3_out, x1_out]
class ResNetFPN_16_4(nn.Module):
"""
ResNet+FPN, output resolution are 1/16 and 1/4.
Each block has 2 layers.
"""
def __init__(self, config):
super().__init__()
# Config
block = BasicBlock
initial_dim = config['initial_dim']
block_dims = config['block_dims']
# Class Variable
self.in_planes = initial_dim
# Networks
self.conv1 = nn.Conv2d(1, initial_dim, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = nn.BatchNorm2d(initial_dim)
self.relu = nn.ReLU(inplace=True)
self.layer1 = self._make_layer(block, block_dims[0], stride=1) # 1/2
self.layer2 = self._make_layer(block, block_dims[1], stride=2) # 1/4
self.layer3 = self._make_layer(block, block_dims[2], stride=2) # 1/8
self.layer4 = self._make_layer(block, block_dims[3], stride=2) # 1/16
# 3. FPN upsample
self.layer4_outconv = conv1x1(block_dims[3], block_dims[3])
self.layer3_outconv = conv1x1(block_dims[2], block_dims[3])
self.layer3_outconv2 = nn.Sequential(
conv3x3(block_dims[3], block_dims[3]),
nn.BatchNorm2d(block_dims[3]),
nn.LeakyReLU(),
conv3x3(block_dims[3], block_dims[2]),
)
self.layer2_outconv = conv1x1(block_dims[1], block_dims[2])
self.layer2_outconv2 = nn.Sequential(
conv3x3(block_dims[2], block_dims[2]),
nn.BatchNorm2d(block_dims[2]),
nn.LeakyReLU(),
conv3x3(block_dims[2], block_dims[1]),
)
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
def _make_layer(self, block, dim, stride=1):
layer1 = block(self.in_planes, dim, stride=stride)
layer2 = block(dim, dim, stride=1)
layers = (layer1, layer2)
self.in_planes = dim
return nn.Sequential(*layers)
def forward(self, x):
# ResNet Backbone
x0 = self.relu(self.bn1(self.conv1(x)))
x1 = self.layer1(x0) # 1/2
x2 = self.layer2(x1) # 1/4
x3 = self.layer3(x2) # 1/8
x4 = self.layer4(x3) # 1/16
# FPN
x4_out = self.layer4_outconv(x4)
x4_out_2x = F.interpolate(x4_out, scale_factor=2., mode='bilinear', align_corners=True)
x3_out = self.layer3_outconv(x3)
x3_out = self.layer3_outconv2(x3_out+x4_out_2x)
x3_out_2x = F.interpolate(x3_out, scale_factor=2., mode='bilinear', align_corners=True)
x2_out = self.layer2_outconv(x2)
x2_out = self.layer2_outconv2(x2_out+x3_out_2x)
return [x4_out, x2_out]
import torch
import torch.nn as nn
from einops.einops import rearrange
from .backbone import build_backbone
from .utils.position_encoding import PositionEncodingSine
from .loftr_module import LocalFeatureTransformer, FinePreprocess
from .utils.coarse_matching import CoarseMatching
from .utils.fine_matching import FineMatching
class LoFTR(nn.Module):
def __init__(self, config):
super().__init__()
# Misc
self.config = config
# Modules
self.backbone = build_backbone(config)
self.pos_encoding = PositionEncodingSine(
config['coarse']['d_model'],
temp_bug_fix=config['coarse']['temp_bug_fix'])
self.loftr_coarse = LocalFeatureTransformer(config['coarse'])
self.coarse_matching = CoarseMatching(config['match_coarse'])
self.fine_preprocess = FinePreprocess(config)
self.loftr_fine = LocalFeatureTransformer(config["fine"])
self.fine_matching = FineMatching()
def forward(self, data):
"""
Update:
data (dict): {
'image0': (torch.Tensor): (N, 1, H, W)
'image1': (torch.Tensor): (N, 1, H, W)
'mask0'(optional) : (torch.Tensor): (N, H, W) '0' indicates a padded position
'mask1'(optional) : (torch.Tensor): (N, H, W)
}
"""
# 1. Local Feature CNN
data.update({
'bs': data['image0'].size(0),
'hw0_i': data['image0'].shape[2:], 'hw1_i': data['image1'].shape[2:]
})
if data['hw0_i'] == data['hw1_i']: # faster & better BN convergence
feats_c, feats_f = self.backbone(torch.cat([data['image0'], data['image1']], dim=0))
(feat_c0, feat_c1), (feat_f0, feat_f1) = feats_c.split(data['bs']), feats_f.split(data['bs'])
else: # handle different input shapes
(feat_c0, feat_f0), (feat_c1, feat_f1) = self.backbone(data['image0']), self.backbone(data['image1'])
data.update({
'hw0_c': feat_c0.shape[2:], 'hw1_c': feat_c1.shape[2:],
'hw0_f': feat_f0.shape[2:], 'hw1_f': feat_f1.shape[2:]
})
# 2. coarse-level loftr module
# add featmap with positional encoding, then flatten it to sequence [N, HW, C]
feat_c0 = rearrange(self.pos_encoding(feat_c0), 'n c h w -> n (h w) c')
feat_c1 = rearrange(self.pos_encoding(feat_c1), 'n c h w -> n (h w) c')
mask_c0 = mask_c1 = None # mask is useful in training
if 'mask0' in data:
mask_c0, mask_c1 = data['mask0'].flatten(-2), data['mask1'].flatten(-2)
feat_c0, feat_c1 = self.loftr_coarse(feat_c0, feat_c1, mask_c0, mask_c1)
# 3. match coarse-level
self.coarse_matching(feat_c0, feat_c1, data, mask_c0=mask_c0, mask_c1=mask_c1)
# 4. fine-level refinement
feat_f0_unfold, feat_f1_unfold = self.fine_preprocess(feat_f0, feat_f1, feat_c0, feat_c1, data)
if feat_f0_unfold.size(0) != 0: # at least one coarse level predicted
feat_f0_unfold, feat_f1_unfold = self.loftr_fine(feat_f0_unfold, feat_f1_unfold)
# 5. match fine-level
self.fine_matching(feat_f0_unfold, feat_f1_unfold, data)
def load_state_dict(self, state_dict, *args, **kwargs):
for k in list(state_dict.keys()):
if k.startswith('matcher.'):
state_dict[k.replace('matcher.', '', 1)] = state_dict.pop(k)
return super().load_state_dict(state_dict, *args, **kwargs)
from .transformer import LocalFeatureTransformer
from .fine_preprocess import FinePreprocess
import torch
import torch.nn as nn
import torch.nn.functional as F
from einops.einops import rearrange, repeat
class FinePreprocess(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.cat_c_feat = config['fine_concat_coarse_feat']
self.W = self.config['fine_window_size']
d_model_c = self.config['coarse']['d_model']
d_model_f = self.config['fine']['d_model']
self.d_model_f = d_model_f
if self.cat_c_feat:
self.down_proj = nn.Linear(d_model_c, d_model_f, bias=True)
self.merge_feat = nn.Linear(2*d_model_f, d_model_f, bias=True)
self._reset_parameters()
def _reset_parameters(self):
for p in self.parameters():
if p.dim() > 1:
nn.init.kaiming_normal_(p, mode="fan_out", nonlinearity="relu")
def forward(self, feat_f0, feat_f1, feat_c0, feat_c1, data):
W = self.W
stride = data['hw0_f'][0] // data['hw0_c'][0]
data.update({'W': W})
if data['b_ids'].shape[0] == 0:
feat0 = torch.empty(0, self.W**2, self.d_model_f, device=feat_f0.device)
feat1 = torch.empty(0, self.W**2, self.d_model_f, device=feat_f0.device)
return feat0, feat1
# 1. unfold(crop) all local windows
feat_f0_unfold = F.unfold(feat_f0, kernel_size=(W, W), stride=stride, padding=W//2)
feat_f0_unfold = rearrange(feat_f0_unfold, 'n (c ww) l -> n l ww c', ww=W**2)
feat_f1_unfold = F.unfold(feat_f1, kernel_size=(W, W), stride=stride, padding=W//2)
feat_f1_unfold = rearrange(feat_f1_unfold, 'n (c ww) l -> n l ww c', ww=W**2)
# 2. select only the predicted matches
feat_f0_unfold = feat_f0_unfold[data['b_ids'], data['i_ids']] # [n, ww, cf]
feat_f1_unfold = feat_f1_unfold[data['b_ids'], data['j_ids']]
# option: use coarse-level loftr feature as context: concat and linear
if self.cat_c_feat:
feat_c_win = self.down_proj(torch.cat([feat_c0[data['b_ids'], data['i_ids']],
feat_c1[data['b_ids'], data['j_ids']]], 0)) # [2n, c]
feat_cf_win = self.merge_feat(torch.cat([
torch.cat([feat_f0_unfold, feat_f1_unfold], 0), # [2n, ww, cf]
repeat(feat_c_win, 'n c -> n ww c', ww=W**2), # [2n, ww, cf]
], -1))
feat_f0_unfold, feat_f1_unfold = torch.chunk(feat_cf_win, 2, dim=0)
return feat_f0_unfold, feat_f1_unfold
"""
Linear Transformer proposed in "Transformers are RNNs: Fast Autoregressive Transformers with Linear Attention"
Modified from: https://github.com/idiap/fast-transformers/blob/master/fast_transformers/attention/linear_attention.py
"""
import torch
from torch.nn import Module, Dropout
def elu_feature_map(x):
return torch.nn.functional.elu(x) + 1
class LinearAttention(Module):
def __init__(self, eps=1e-6):
super().__init__()
self.feature_map = elu_feature_map
self.eps = eps
def forward(self, queries, keys, values, q_mask=None, kv_mask=None):
""" Multi-Head linear attention proposed in "Transformers are RNNs"
Args:
queries: [N, L, H, D]
keys: [N, S, H, D]
values: [N, S, H, D]
q_mask: [N, L]
kv_mask: [N, S]
Returns:
queried_values: (N, L, H, D)
"""
Q = self.feature_map(queries)
K = self.feature_map(keys)
# set padded position to zero
if q_mask is not None:
Q = Q * q_mask[:, :, None, None]
if kv_mask is not None:
K = K * kv_mask[:, :, None, None]
values = values * kv_mask[:, :, None, None]
v_length = values.size(1)
values = values / v_length # prevent fp16 overflow
KV = torch.einsum("nshd,nshv->nhdv", K, values) # (S,D)' @ S,V
Z = 1 / (torch.einsum("nlhd,nhd->nlh", Q, K.sum(dim=1)) + self.eps)
queried_values = torch.einsum("nlhd,nhdv,nlh->nlhv", Q, KV, Z) * v_length
return queried_values.contiguous()
class FullAttention(Module):
def __init__(self, use_dropout=False, attention_dropout=0.1):
super().__init__()
self.use_dropout = use_dropout
self.dropout = Dropout(attention_dropout)
def forward(self, queries, keys, values, q_mask=None, kv_mask=None):
""" Multi-head scaled dot-product attention, a.k.a full attention.
Args:
queries: [N, L, H, D]
keys: [N, S, H, D]
values: [N, S, H, D]
q_mask: [N, L]
kv_mask: [N, S]
Returns:
queried_values: (N, L, H, D)
"""
# Compute the unnormalized attention and apply the masks
QK = torch.einsum("nlhd,nshd->nlsh", queries, keys)
if kv_mask is not None:
QK.masked_fill_(~(q_mask[:, :, None, None] * kv_mask[:, None, :, None]), float('-inf'))
# Compute the attention and the weighted average
softmax_temp = 1. / queries.size(3)**.5 # sqrt(D)
A = torch.softmax(softmax_temp * QK, dim=2)
if self.use_dropout:
A = self.dropout(A)
queried_values = torch.einsum("nlsh,nshd->nlhd", A, values)
return queried_values.contiguous()
import copy
import torch
import torch.nn as nn
from .linear_attention import LinearAttention, FullAttention
class LoFTREncoderLayer(nn.Module):
def __init__(self,
d_model,
nhead,
attention='linear'):
super(LoFTREncoderLayer, self).__init__()
self.dim = d_model // nhead
self.nhead = nhead
# multi-head attention
self.q_proj = nn.Linear(d_model, d_model, bias=False)
self.k_proj = nn.Linear(d_model, d_model, bias=False)
self.v_proj = nn.Linear(d_model, d_model, bias=False)
self.attention = LinearAttention() if attention == 'linear' else FullAttention()
self.merge = nn.Linear(d_model, d_model, bias=False)
# feed-forward network
self.mlp = nn.Sequential(
nn.Linear(d_model*2, d_model*2, bias=False),
nn.ReLU(True),
nn.Linear(d_model*2, d_model, bias=False),
)
# norm and dropout
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
def forward(self, x, source, x_mask=None, source_mask=None):
"""
Args:
x (torch.Tensor): [N, L, C]
source (torch.Tensor): [N, S, C]
x_mask (torch.Tensor): [N, L] (optional)
source_mask (torch.Tensor): [N, S] (optional)
"""
bs = x.size(0)
query, key, value = x, source, source
# multi-head attention
query = self.q_proj(query).view(bs, -1, self.nhead, self.dim) # [N, L, (H, D)]
key = self.k_proj(key).view(bs, -1, self.nhead, self.dim) # [N, S, (H, D)]
value = self.v_proj(value).view(bs, -1, self.nhead, self.dim)
message = self.attention(query, key, value, q_mask=x_mask, kv_mask=source_mask) # [N, L, (H, D)]
message = self.merge(message.view(bs, -1, self.nhead*self.dim)) # [N, L, C]
message = self.norm1(message)
# feed-forward network
message = self.mlp(torch.cat([x, message], dim=2))
message = self.norm2(message)
return x + message
class LocalFeatureTransformer(nn.Module):
"""A Local Feature Transformer (LoFTR) module."""
def __init__(self, config):
super(LocalFeatureTransformer, self).__init__()
self.config = config
self.d_model = config['d_model']
self.nhead = config['nhead']
self.layer_names = config['layer_names']
encoder_layer = LoFTREncoderLayer(config['d_model'], config['nhead'], config['attention'])
self.layers = nn.ModuleList([copy.deepcopy(encoder_layer) for _ in range(len(self.layer_names))])
self._reset_parameters()
def _reset_parameters(self):
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
def forward(self, feat0, feat1, mask0=None, mask1=None):
"""
Args:
feat0 (torch.Tensor): [N, L, C]
feat1 (torch.Tensor): [N, S, C]
mask0 (torch.Tensor): [N, L] (optional)
mask1 (torch.Tensor): [N, S] (optional)
"""
assert self.d_model == feat0.size(2), "the feature number of src and transformer must be equal"
for layer, name in zip(self.layers, self.layer_names):
if name == 'self':
feat0 = layer(feat0, feat0, mask0, mask0)
feat1 = layer(feat1, feat1, mask1, mask1)
elif name == 'cross':
feat0 = layer(feat0, feat1, mask0, mask1)
feat1 = layer(feat1, feat0, mask1, mask0)
else:
raise KeyError
return feat0, feat1
import torch
import torch.nn as nn
import torch.nn.functional as F
from einops.einops import rearrange
INF = 1e9
def mask_border(m, b: int, v):
""" Mask borders with value
Args:
m (torch.Tensor): [N, H0, W0, H1, W1]
b (int)
v (m.dtype)
"""
if b <= 0:
return
m[:, :b] = v
m[:, :, :b] = v
m[:, :, :, :b] = v
m[:, :, :, :, :b] = v
m[:, -b:] = v
m[:, :, -b:] = v
m[:, :, :, -b:] = v
m[:, :, :, :, -b:] = v
def mask_border_with_padding(m, bd, v, p_m0, p_m1):
if bd <= 0:
return
m[:, :bd] = v
m[:, :, :bd] = v
m[:, :, :, :bd] = v
m[:, :, :, :, :bd] = v
h0s, w0s = p_m0.sum(1).max(-1)[0].int(), p_m0.sum(-1).max(-1)[0].int()
h1s, w1s = p_m1.sum(1).max(-1)[0].int(), p_m1.sum(-1).max(-1)[0].int()
for b_idx, (h0, w0, h1, w1) in enumerate(zip(h0s, w0s, h1s, w1s)):
m[b_idx, h0 - bd:] = v
m[b_idx, :, w0 - bd:] = v
m[b_idx, :, :, h1 - bd:] = v
m[b_idx, :, :, :, w1 - bd:] = v
def compute_max_candidates(p_m0, p_m1):
"""Compute the max candidates of all pairs within a batch
Args:
p_m0, p_m1 (torch.Tensor): padded masks
"""
h0s, w0s = p_m0.sum(1).max(-1)[0], p_m0.sum(-1).max(-1)[0]
h1s, w1s = p_m1.sum(1).max(-1)[0], p_m1.sum(-1).max(-1)[0]
max_cand = torch.sum(
torch.min(torch.stack([h0s * w0s, h1s * w1s], -1), -1)[0])
return max_cand
class CoarseMatching(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
# general config
self.thr = config['thr']
self.border_rm = config['border_rm']
# -- # for trainig fine-level LoFTR
self.train_coarse_percent = config['train_coarse_percent']
self.train_pad_num_gt_min = config['train_pad_num_gt_min']
# we provide 2 options for differentiable matching
self.match_type = config['match_type']
if self.match_type == 'dual_softmax':
self.temperature = config['dsmax_temperature']
elif self.match_type == 'sinkhorn':
try:
from .superglue import log_optimal_transport
except ImportError:
raise ImportError("download superglue.py first!")
self.log_optimal_transport = log_optimal_transport
self.bin_score = nn.Parameter(
torch.tensor(config['skh_init_bin_score'], requires_grad=True))
self.skh_iters = config['skh_iters']
self.skh_prefilter = config['skh_prefilter']
else:
raise NotImplementedError()
def forward(self, feat_c0, feat_c1, data, mask_c0=None, mask_c1=None):
"""
Args:
feat0 (torch.Tensor): [N, L, C]
feat1 (torch.Tensor): [N, S, C]
data (dict)
mask_c0 (torch.Tensor): [N, L] (optional)
mask_c1 (torch.Tensor): [N, S] (optional)
Update:
data (dict): {
'b_ids' (torch.Tensor): [M'],
'i_ids' (torch.Tensor): [M'],
'j_ids' (torch.Tensor): [M'],
'gt_mask' (torch.Tensor): [M'],
'mkpts0_c' (torch.Tensor): [M, 2],
'mkpts1_c' (torch.Tensor): [M, 2],
'mconf' (torch.Tensor): [M]}
NOTE: M' != M during training.
"""
N, L, S, C = feat_c0.size(0), feat_c0.size(1), feat_c1.size(1), feat_c0.size(2)
# normalize
feat_c0, feat_c1 = map(lambda feat: feat / feat.shape[-1]**.5,
[feat_c0, feat_c1])
if self.match_type == 'dual_softmax':
sim_matrix = torch.einsum("nlc,nsc->nls", feat_c0,
feat_c1) / self.temperature
if mask_c0 is not None:
sim_matrix.masked_fill_(
~(mask_c0[..., None] * mask_c1[:, None]).bool(),
-INF)
conf_matrix = F.softmax(sim_matrix, 1) * F.softmax(sim_matrix, 2)
elif self.match_type == 'sinkhorn':
# sinkhorn, dustbin included
sim_matrix = torch.einsum("nlc,nsc->nls", feat_c0, feat_c1)
if mask_c0 is not None:
sim_matrix[:, :L, :S].masked_fill_(
~(mask_c0[..., None] * mask_c1[:, None]).bool(),
-INF)
# build uniform prior & use sinkhorn
log_assign_matrix = self.log_optimal_transport(
sim_matrix, self.bin_score, self.skh_iters)
assign_matrix = log_assign_matrix.exp()
conf_matrix = assign_matrix[:, :-1, :-1]
# filter prediction with dustbin score (only in evaluation mode)
if not self.training and self.skh_prefilter:
filter0 = (assign_matrix.max(dim=2)[1] == S)[:, :-1] # [N, L]
filter1 = (assign_matrix.max(dim=1)[1] == L)[:, :-1] # [N, S]
conf_matrix[filter0[..., None].repeat(1, 1, S)] = 0
conf_matrix[filter1[:, None].repeat(1, L, 1)] = 0
if self.config['sparse_spvs']:
data.update({'conf_matrix_with_bin': assign_matrix.clone()})
data.update({'conf_matrix': conf_matrix})
# predict coarse matches from conf_matrix
data.update(**self.get_coarse_match(conf_matrix, data))
@torch.no_grad()
def get_coarse_match(self, conf_matrix, data):
"""
Args:
conf_matrix (torch.Tensor): [N, L, S]
data (dict): with keys ['hw0_i', 'hw1_i', 'hw0_c', 'hw1_c']
Returns:
coarse_matches (dict): {
'b_ids' (torch.Tensor): [M'],
'i_ids' (torch.Tensor): [M'],
'j_ids' (torch.Tensor): [M'],
'gt_mask' (torch.Tensor): [M'],
'm_bids' (torch.Tensor): [M],
'mkpts0_c' (torch.Tensor): [M, 2],
'mkpts1_c' (torch.Tensor): [M, 2],
'mconf' (torch.Tensor): [M]}
"""
axes_lengths = {
'h0c': data['hw0_c'][0],
'w0c': data['hw0_c'][1],
'h1c': data['hw1_c'][0],
'w1c': data['hw1_c'][1]
}
_device = conf_matrix.device
# 1. confidence thresholding
mask = conf_matrix > self.thr
mask = rearrange(mask, 'b (h0c w0c) (h1c w1c) -> b h0c w0c h1c w1c',
**axes_lengths)
if 'mask0' not in data:
mask_border(mask, self.border_rm, False)
else:
mask_border_with_padding(mask, self.border_rm, False,
data['mask0'], data['mask1'])
mask = rearrange(mask, 'b h0c w0c h1c w1c -> b (h0c w0c) (h1c w1c)',
**axes_lengths)
# 2. mutual nearest
mask = mask \
* (conf_matrix == conf_matrix.max(dim=2, keepdim=True)[0]) \
* (conf_matrix == conf_matrix.max(dim=1, keepdim=True)[0])
# 3. find all valid coarse matches
# this only works when at most one `True` in each row
mask_v, all_j_ids = mask.max(dim=2)
b_ids, i_ids = torch.where(mask_v)
j_ids = all_j_ids[b_ids, i_ids]
mconf = conf_matrix[b_ids, i_ids, j_ids]
# 4. Random sampling of training samples for fine-level LoFTR
# (optional) pad samples with gt coarse-level matches
if self.training:
# NOTE:
# The sampling is performed across all pairs in a batch without manually balancing
# #samples for fine-level increases w.r.t. batch_size
if 'mask0' not in data:
num_candidates_max = mask.size(0) * max(
mask.size(1), mask.size(2))
else:
num_candidates_max = compute_max_candidates(
data['mask0'], data['mask1'])
num_matches_train = int(num_candidates_max *
self.train_coarse_percent)
num_matches_pred = len(b_ids)
assert self.train_pad_num_gt_min < num_matches_train, "min-num-gt-pad should be less than num-train-matches"
# pred_indices is to select from prediction
if num_matches_pred <= num_matches_train - self.train_pad_num_gt_min:
pred_indices = torch.arange(num_matches_pred, device=_device)
else:
pred_indices = torch.randint(
num_matches_pred,
(num_matches_train - self.train_pad_num_gt_min, ),
device=_device)
# gt_pad_indices is to select from gt padding. e.g. max(3787-4800, 200)
gt_pad_indices = torch.randint(
len(data['spv_b_ids']),
(max(num_matches_train - num_matches_pred,
self.train_pad_num_gt_min), ),
device=_device)
mconf_gt = torch.zeros(len(data['spv_b_ids']), device=_device) # set conf of gt paddings to all zero
b_ids, i_ids, j_ids, mconf = map(
lambda x, y: torch.cat([x[pred_indices], y[gt_pad_indices]],
dim=0),
*zip([b_ids, data['spv_b_ids']], [i_ids, data['spv_i_ids']],
[j_ids, data['spv_j_ids']], [mconf, mconf_gt]))
# These matches select patches that feed into fine-level network
coarse_matches = {'b_ids': b_ids, 'i_ids': i_ids, 'j_ids': j_ids}
# 4. Update with matches in original image resolution
scale = data['hw0_i'][0] / data['hw0_c'][0]
scale0 = scale * data['scale0'][b_ids] if 'scale0' in data else scale
scale1 = scale * data['scale1'][b_ids] if 'scale1' in data else scale
mkpts0_c = torch.stack(
[i_ids % data['hw0_c'][1], i_ids // data['hw0_c'][1]],
dim=1) * scale0
mkpts1_c = torch.stack(
[j_ids % data['hw1_c'][1], j_ids // data['hw1_c'][1]],
dim=1) * scale1
# These matches is the current prediction (for visualization)
coarse_matches.update({
'gt_mask': mconf == 0,
'm_bids': b_ids[mconf != 0], # mconf == 0 => gt matches
'mkpts0_c': mkpts0_c[mconf != 0],
'mkpts1_c': mkpts1_c[mconf != 0],
'mconf': mconf[mconf != 0]
})
return coarse_matches
from yacs.config import CfgNode as CN
def lower_config(yacs_cfg):
if not isinstance(yacs_cfg, CN):
return yacs_cfg
return {k.lower(): lower_config(v) for k, v in yacs_cfg.items()}
_CN = CN()
_CN.BACKBONE_TYPE = 'ResNetFPN'
_CN.RESOLUTION = (8, 2) # options: [(8, 2), (16, 4)]
_CN.FINE_WINDOW_SIZE = 5 # window_size in fine_level, must be odd
_CN.FINE_CONCAT_COARSE_FEAT = True
# 1. LoFTR-backbone (local feature CNN) config
_CN.RESNETFPN = CN()
_CN.RESNETFPN.INITIAL_DIM = 128
_CN.RESNETFPN.BLOCK_DIMS = [128, 196, 256] # s1, s2, s3
# 2. LoFTR-coarse module config
_CN.COARSE = CN()
_CN.COARSE.D_MODEL = 256
_CN.COARSE.D_FFN = 256
_CN.COARSE.NHEAD = 8
_CN.COARSE.LAYER_NAMES = ['self', 'cross'] * 4
_CN.COARSE.ATTENTION = 'linear' # options: ['linear', 'full']
_CN.COARSE.TEMP_BUG_FIX = False
# 3. Coarse-Matching config
_CN.MATCH_COARSE = CN()
_CN.MATCH_COARSE.THR = 0.2
_CN.MATCH_COARSE.BORDER_RM = 2
_CN.MATCH_COARSE.MATCH_TYPE = 'dual_softmax' # options: ['dual_softmax, 'sinkhorn']
_CN.MATCH_COARSE.DSMAX_TEMPERATURE = 0.1
_CN.MATCH_COARSE.SKH_ITERS = 3
_CN.MATCH_COARSE.SKH_INIT_BIN_SCORE = 1.0
_CN.MATCH_COARSE.SKH_PREFILTER = True
_CN.MATCH_COARSE.TRAIN_COARSE_PERCENT = 0.4 # training tricks: save GPU memory
_CN.MATCH_COARSE.TRAIN_PAD_NUM_GT_MIN = 200 # training tricks: avoid DDP deadlock
# 4. LoFTR-fine module config
_CN.FINE = CN()
_CN.FINE.D_MODEL = 128
_CN.FINE.D_FFN = 128
_CN.FINE.NHEAD = 8
_CN.FINE.LAYER_NAMES = ['self', 'cross'] * 1
_CN.FINE.ATTENTION = 'linear'
default_cfg = lower_config(_CN)
import math
import torch
import torch.nn as nn
from kornia.geometry.subpix import dsnt
from kornia.utils.grid import create_meshgrid
class FineMatching(nn.Module):
"""FineMatching with s2d paradigm"""
def __init__(self):
super().__init__()
def forward(self, feat_f0, feat_f1, data):
"""
Args:
feat0 (torch.Tensor): [M, WW, C]
feat1 (torch.Tensor): [M, WW, C]
data (dict)
Update:
data (dict):{
'expec_f' (torch.Tensor): [M, 3],
'mkpts0_f' (torch.Tensor): [M, 2],
'mkpts1_f' (torch.Tensor): [M, 2]}
"""
M, WW, C = feat_f0.shape
W = int(math.sqrt(WW))
scale = data['hw0_i'][0] / data['hw0_f'][0]
self.M, self.W, self.WW, self.C, self.scale = M, W, WW, C, scale
# corner case: if no coarse matches found
if M == 0:
assert self.training == False, "M is always >0, when training, see coarse_matching.py"
# logger.warning('No matches found in coarse-level.')
data.update({
'expec_f': torch.empty(0, 3, device=feat_f0.device),
'mkpts0_f': data['mkpts0_c'],
'mkpts1_f': data['mkpts1_c'],
})
return
feat_f0_picked = feat_f0_picked = feat_f0[:, WW//2, :]
sim_matrix = torch.einsum('mc,mrc->mr', feat_f0_picked, feat_f1)
softmax_temp = 1. / C**.5
heatmap = torch.softmax(softmax_temp * sim_matrix, dim=1).view(-1, W, W)
# compute coordinates from heatmap
coords_normalized = dsnt.spatial_expectation2d(heatmap[None], True)[0] # [M, 2]
grid_normalized = create_meshgrid(W, W, True, heatmap.device).reshape(1, -1, 2) # [1, WW, 2]
# compute std over <x, y>
var = torch.sum(grid_normalized**2 * heatmap.view(-1, WW, 1), dim=1) - coords_normalized**2 # [M, 2]
std = torch.sum(torch.sqrt(torch.clamp(var, min=1e-10)), -1) # [M] clamp needed for numerical stability
# for fine-level supervision
data.update({'expec_f': torch.cat([coords_normalized, std.unsqueeze(1)], -1)})
# compute absolute kpt coords
self.get_fine_match(coords_normalized, data)
@torch.no_grad()
def get_fine_match(self, coords_normed, data):
W, WW, C, scale = self.W, self.WW, self.C, self.scale
# mkpts0_f and mkpts1_f
mkpts0_f = data['mkpts0_c']
scale1 = scale * data['scale1'][data['b_ids']] if 'scale0' in data else scale
mkpts1_f = data['mkpts1_c'] + (coords_normed * (W // 2) * scale1)[:len(data['mconf'])]
data.update({
"mkpts0_f": mkpts0_f,
"mkpts1_f": mkpts1_f
})
import torch
@torch.no_grad()
def warp_kpts(kpts0, depth0, depth1, T_0to1, K0, K1):
""" Warp kpts0 from I0 to I1 with depth, K and Rt
Also check covisibility and depth consistency.
Depth is consistent if relative error < 0.2 (hard-coded).
Args:
kpts0 (torch.Tensor): [N, L, 2] - <x, y>,
depth0 (torch.Tensor): [N, H, W],
depth1 (torch.Tensor): [N, H, W],
T_0to1 (torch.Tensor): [N, 3, 4],
K0 (torch.Tensor): [N, 3, 3],
K1 (torch.Tensor): [N, 3, 3],
Returns:
calculable_mask (torch.Tensor): [N, L]
warped_keypoints0 (torch.Tensor): [N, L, 2] <x0_hat, y1_hat>
"""
kpts0_long = kpts0.round().long()
# Sample depth, get calculable_mask on depth != 0
kpts0_depth = torch.stack(
[depth0[i, kpts0_long[i, :, 1], kpts0_long[i, :, 0]] for i in range(kpts0.shape[0])], dim=0
) # (N, L)
nonzero_mask = kpts0_depth != 0
# Unproject
kpts0_h = torch.cat([kpts0, torch.ones_like(kpts0[:, :, [0]])], dim=-1) * kpts0_depth[..., None] # (N, L, 3)
kpts0_cam = K0.inverse() @ kpts0_h.transpose(2, 1) # (N, 3, L)
# Rigid Transform
w_kpts0_cam = T_0to1[:, :3, :3] @ kpts0_cam + T_0to1[:, :3, [3]] # (N, 3, L)
w_kpts0_depth_computed = w_kpts0_cam[:, 2, :]
# Project
w_kpts0_h = (K1 @ w_kpts0_cam).transpose(2, 1) # (N, L, 3)
w_kpts0 = w_kpts0_h[:, :, :2] / (w_kpts0_h[:, :, [2]] + 1e-4) # (N, L, 2), +1e-4 to avoid zero depth
# Covisible Check
h, w = depth1.shape[1:3]
covisible_mask = (w_kpts0[:, :, 0] > 0) * (w_kpts0[:, :, 0] < w-1) * \
(w_kpts0[:, :, 1] > 0) * (w_kpts0[:, :, 1] < h-1)
w_kpts0_long = w_kpts0.long()
w_kpts0_long[~covisible_mask, :] = 0
w_kpts0_depth = torch.stack(
[depth1[i, w_kpts0_long[i, :, 1], w_kpts0_long[i, :, 0]] for i in range(w_kpts0_long.shape[0])], dim=0
) # (N, L)
consistent_mask = ((w_kpts0_depth - w_kpts0_depth_computed) / w_kpts0_depth).abs() < 0.2
valid_mask = nonzero_mask * covisible_mask * consistent_mask
return valid_mask, w_kpts0
import math
import torch
from torch import nn
class PositionEncodingSine(nn.Module):
"""
This is a sinusoidal position encoding that generalized to 2-dimensional images
"""
def __init__(self, d_model, max_shape=(256, 256), temp_bug_fix=True):
"""
Args:
max_shape (tuple): for 1/8 featmap, the max length of 256 corresponds to 2048 pixels
temp_bug_fix (bool): As noted in this [issue](https://github.com/zju3dv/LoFTR/issues/41),
the original implementation of LoFTR includes a bug in the pos-enc impl, which has little impact
on the final performance. For now, we keep both impls for backward compatability.
We will remove the buggy impl after re-training all variants of our released models.
"""
super().__init__()
pe = torch.zeros((d_model, *max_shape))
y_position = torch.ones(max_shape).cumsum(0).float().unsqueeze(0)
x_position = torch.ones(max_shape).cumsum(1).float().unsqueeze(0)
if temp_bug_fix:
div_term = torch.exp(torch.arange(0, d_model//2, 2).float() * (-math.log(10000.0) / (d_model//2)))
else: # a buggy implementation (for backward compatability only)
div_term = torch.exp(torch.arange(0, d_model//2, 2).float() * (-math.log(10000.0) / d_model//2))
div_term = div_term[:, None, None] # [C//4, 1, 1]
pe[0::4, :, :] = torch.sin(x_position * div_term)
pe[1::4, :, :] = torch.cos(x_position * div_term)
pe[2::4, :, :] = torch.sin(y_position * div_term)
pe[3::4, :, :] = torch.cos(y_position * div_term)
self.register_buffer('pe', pe.unsqueeze(0), persistent=False) # [1, C, H, W]
def forward(self, x):
"""
Args:
x: [N, C, H, W]
"""
return x + self.pe[:, :, :x.size(2), :x.size(3)]
from math import log
from loguru import logger
import torch
from einops import repeat
from kornia.utils import create_meshgrid
from .geometry import warp_kpts
############## ↓ Coarse-Level supervision ↓ ##############
@torch.no_grad()
def mask_pts_at_padded_regions(grid_pt, mask):
"""For megadepth dataset, zero-padding exists in images"""
mask = repeat(mask, 'n h w -> n (h w) c', c=2)
grid_pt[~mask.bool()] = 0
return grid_pt
@torch.no_grad()
def spvs_coarse(data, config):
"""
Update:
data (dict): {
"conf_matrix_gt": [N, hw0, hw1],
'spv_b_ids': [M]
'spv_i_ids': [M]
'spv_j_ids': [M]
'spv_w_pt0_i': [N, hw0, 2], in original image resolution
'spv_pt1_i': [N, hw1, 2], in original image resolution
}
NOTE:
- for scannet dataset, there're 3 kinds of resolution {i, c, f}
- for megadepth dataset, there're 4 kinds of resolution {i, i_resize, c, f}
"""
# 1. misc
device = data['image0'].device
N, _, H0, W0 = data['image0'].shape
_, _, H1, W1 = data['image1'].shape
scale = config['LOFTR']['RESOLUTION'][0]
scale0 = scale * data['scale0'][:, None] if 'scale0' in data else scale
scale1 = scale * data['scale1'][:, None] if 'scale1' in data else scale
h0, w0, h1, w1 = map(lambda x: x // scale, [H0, W0, H1, W1])
# 2. warp grids
# create kpts in meshgrid and resize them to image resolution
grid_pt0_c = create_meshgrid(h0, w0, False, device).reshape(1, h0*w0, 2).repeat(N, 1, 1) # [N, hw, 2]
grid_pt0_i = scale0 * grid_pt0_c
grid_pt1_c = create_meshgrid(h1, w1, False, device).reshape(1, h1*w1, 2).repeat(N, 1, 1)
grid_pt1_i = scale1 * grid_pt1_c
# mask padded region to (0, 0), so no need to manually mask conf_matrix_gt
if 'mask0' in data:
grid_pt0_i = mask_pts_at_padded_regions(grid_pt0_i, data['mask0'])
grid_pt1_i = mask_pts_at_padded_regions(grid_pt1_i, data['mask1'])
# warp kpts bi-directionally and resize them to coarse-level resolution
# (no depth consistency check, since it leads to worse results experimentally)
# (unhandled edge case: points with 0-depth will be warped to the left-up corner)
_, w_pt0_i = warp_kpts(grid_pt0_i, data['depth0'], data['depth1'], data['T_0to1'], data['K0'], data['K1'])
_, w_pt1_i = warp_kpts(grid_pt1_i, data['depth1'], data['depth0'], data['T_1to0'], data['K1'], data['K0'])
w_pt0_c = w_pt0_i / scale1
w_pt1_c = w_pt1_i / scale0
# 3. check if mutual nearest neighbor
w_pt0_c_round = w_pt0_c[:, :, :].round().long()
nearest_index1 = w_pt0_c_round[..., 0] + w_pt0_c_round[..., 1] * w1
w_pt1_c_round = w_pt1_c[:, :, :].round().long()
nearest_index0 = w_pt1_c_round[..., 0] + w_pt1_c_round[..., 1] * w0
# corner case: out of boundary
def out_bound_mask(pt, w, h):
return (pt[..., 0] < 0) + (pt[..., 0] >= w) + (pt[..., 1] < 0) + (pt[..., 1] >= h)
nearest_index1[out_bound_mask(w_pt0_c_round, w1, h1)] = 0
nearest_index0[out_bound_mask(w_pt1_c_round, w0, h0)] = 0
loop_back = torch.stack([nearest_index0[_b][_i] for _b, _i in enumerate(nearest_index1)], dim=0)
correct_0to1 = loop_back == torch.arange(h0*w0, device=device)[None].repeat(N, 1)
correct_0to1[:, 0] = False # ignore the top-left corner
# 4. construct a gt conf_matrix
conf_matrix_gt = torch.zeros(N, h0*w0, h1*w1, device=device)
b_ids, i_ids = torch.where(correct_0to1 != 0)
j_ids = nearest_index1[b_ids, i_ids]
conf_matrix_gt[b_ids, i_ids, j_ids] = 1
data.update({'conf_matrix_gt': conf_matrix_gt})
# 5. save coarse matches(gt) for training fine level
if len(b_ids) == 0:
logger.warning(f"No groundtruth coarse match found for: {data['pair_names']}")
# this won't affect fine-level loss calculation
b_ids = torch.tensor([0], device=device)
i_ids = torch.tensor([0], device=device)
j_ids = torch.tensor([0], device=device)
data.update({
'spv_b_ids': b_ids,
'spv_i_ids': i_ids,
'spv_j_ids': j_ids
})
# 6. save intermediate results (for fast fine-level computation)
data.update({
'spv_w_pt0_i': w_pt0_i,
'spv_pt1_i': grid_pt1_i
})
def compute_supervision_coarse(data, config):
assert len(set(data['dataset_name'])) == 1, "Do not support mixed datasets training!"
data_source = data['dataset_name'][0]
if data_source.lower() in ['scannet', 'megadepth']:
spvs_coarse(data, config)
else:
raise ValueError(f'Unknown data source: {data_source}')
############## ↓ Fine-Level supervision ↓ ##############
@torch.no_grad()
def spvs_fine(data, config):
"""
Update:
data (dict):{
"expec_f_gt": [M, 2]}
"""
# 1. misc
# w_pt0_i, pt1_i = data.pop('spv_w_pt0_i'), data.pop('spv_pt1_i')
w_pt0_i, pt1_i = data['spv_w_pt0_i'], data['spv_pt1_i']
scale = config['LOFTR']['RESOLUTION'][1]
radius = config['LOFTR']['FINE_WINDOW_SIZE'] // 2
# 2. get coarse prediction
b_ids, i_ids, j_ids = data['b_ids'], data['i_ids'], data['j_ids']
# 3. compute gt
scale = scale * data['scale1'][b_ids] if 'scale0' in data else scale
# `expec_f_gt` might exceed the window, i.e. abs(*) > 1, which would be filtered later
expec_f_gt = (w_pt0_i[b_ids, i_ids] - pt1_i[b_ids, j_ids]) / scale / radius # [M, 2]
data.update({"expec_f_gt": expec_f_gt})
def compute_supervision_fine(data, config):
data_source = data['dataset_name'][0]
if data_source.lower() in ['scannet', 'megadepth']:
spvs_fine(data, config)
else:
raise NotImplementedError
from loguru import logger
import torch
import torch.nn as nn
class LoFTRLoss(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config # config under the global namespace
self.loss_config = config['loftr']['loss']
self.match_type = self.config['loftr']['match_coarse']['match_type']
self.sparse_spvs = self.config['loftr']['match_coarse']['sparse_spvs']
# coarse-level
self.correct_thr = self.loss_config['fine_correct_thr']
self.c_pos_w = self.loss_config['pos_weight']
self.c_neg_w = self.loss_config['neg_weight']
# fine-level
self.fine_type = self.loss_config['fine_type']
def compute_coarse_loss(self, conf, conf_gt, weight=None):
""" Point-wise CE / Focal Loss with 0 / 1 confidence as gt.
Args:
conf (torch.Tensor): (N, HW0, HW1) / (N, HW0+1, HW1+1)
conf_gt (torch.Tensor): (N, HW0, HW1)
weight (torch.Tensor): (N, HW0, HW1)
"""
pos_mask, neg_mask = conf_gt == 1, conf_gt == 0
c_pos_w, c_neg_w = self.c_pos_w, self.c_neg_w
# corner case: no gt coarse-level match at all
if not pos_mask.any(): # assign a wrong gt
pos_mask[0, 0, 0] = True
if weight is not None:
weight[0, 0, 0] = 0.
c_pos_w = 0.
if not neg_mask.any():
neg_mask[0, 0, 0] = True
if weight is not None:
weight[0, 0, 0] = 0.
c_neg_w = 0.
if self.loss_config['coarse_type'] == 'cross_entropy':
assert not self.sparse_spvs, 'Sparse Supervision for cross-entropy not implemented!'
conf = torch.clamp(conf, 1e-6, 1-1e-6)
loss_pos = - torch.log(conf[pos_mask])
loss_neg = - torch.log(1 - conf[neg_mask])
if weight is not None:
loss_pos = loss_pos * weight[pos_mask]
loss_neg = loss_neg * weight[neg_mask]
return c_pos_w * loss_pos.mean() + c_neg_w * loss_neg.mean()
elif self.loss_config['coarse_type'] == 'focal':
conf = torch.clamp(conf, 1e-6, 1-1e-6)
alpha = self.loss_config['focal_alpha']
gamma = self.loss_config['focal_gamma']
if self.sparse_spvs:
pos_conf = conf[:, :-1, :-1][pos_mask] \
if self.match_type == 'sinkhorn' \
else conf[pos_mask]
loss_pos = - alpha * torch.pow(1 - pos_conf, gamma) * pos_conf.log()
# calculate losses for negative samples
if self.match_type == 'sinkhorn':
neg0, neg1 = conf_gt.sum(-1) == 0, conf_gt.sum(1) == 0
neg_conf = torch.cat([conf[:, :-1, -1][neg0], conf[:, -1, :-1][neg1]], 0)
loss_neg = - alpha * torch.pow(1 - neg_conf, gamma) * neg_conf.log()
else:
# These is no dustbin for dual_softmax, so we left unmatchable patches without supervision.
# we could also add 'pseudo negtive-samples'
pass
# handle loss weights
if weight is not None:
# Different from dense-spvs, the loss w.r.t. padded regions aren't directly zeroed out,
# but only through manually setting corresponding regions in sim_matrix to '-inf'.
loss_pos = loss_pos * weight[pos_mask]
if self.match_type == 'sinkhorn':
neg_w0 = (weight.sum(-1) != 0)[neg0]
neg_w1 = (weight.sum(1) != 0)[neg1]
neg_mask = torch.cat([neg_w0, neg_w1], 0)
loss_neg = loss_neg[neg_mask]
loss = c_pos_w * loss_pos.mean() + c_neg_w * loss_neg.mean() \
if self.match_type == 'sinkhorn' \
else c_pos_w * loss_pos.mean()
return loss
# positive and negative elements occupy similar propotions. => more balanced loss weights needed
else: # dense supervision (in the case of match_type=='sinkhorn', the dustbin is not supervised.)
loss_pos = - alpha * torch.pow(1 - conf[pos_mask], gamma) * (conf[pos_mask]).log()
loss_neg = - alpha * torch.pow(conf[neg_mask], gamma) * (1 - conf[neg_mask]).log()
if weight is not None:
loss_pos = loss_pos * weight[pos_mask]
loss_neg = loss_neg * weight[neg_mask]
return c_pos_w * loss_pos.mean() + c_neg_w * loss_neg.mean()
# each negative element occupy a smaller propotion than positive elements. => higher negative loss weight needed
else:
raise ValueError('Unknown coarse loss: {type}'.format(type=self.loss_config['coarse_type']))
def compute_fine_loss(self, expec_f, expec_f_gt):
if self.fine_type == 'l2_with_std':
return self._compute_fine_loss_l2_std(expec_f, expec_f_gt)
elif self.fine_type == 'l2':
return self._compute_fine_loss_l2(expec_f, expec_f_gt)
else:
raise NotImplementedError()
def _compute_fine_loss_l2(self, expec_f, expec_f_gt):
"""
Args:
expec_f (torch.Tensor): [M, 2] <x, y>
expec_f_gt (torch.Tensor): [M, 2] <x, y>
"""
correct_mask = torch.linalg.norm(expec_f_gt, ord=float('inf'), dim=1) < self.correct_thr
if correct_mask.sum() == 0:
if self.training: # this seldomly happen when training, since we pad prediction with gt
logger.warning("assign a false supervision to avoid ddp deadlock")
correct_mask[0] = True
else:
return None
offset_l2 = ((expec_f_gt[correct_mask] - expec_f[correct_mask]) ** 2).sum(-1)
return offset_l2.mean()
def _compute_fine_loss_l2_std(self, expec_f, expec_f_gt):
"""
Args:
expec_f (torch.Tensor): [M, 3] <x, y, std>
expec_f_gt (torch.Tensor): [M, 2] <x, y>
"""
# correct_mask tells you which pair to compute fine-loss
correct_mask = torch.linalg.norm(expec_f_gt, ord=float('inf'), dim=1) < self.correct_thr
# use std as weight that measures uncertainty
std = expec_f[:, 2]
inverse_std = 1. / torch.clamp(std, min=1e-10)
weight = (inverse_std / torch.mean(inverse_std)).detach() # avoid minizing loss through increase std
# corner case: no correct coarse match found
if not correct_mask.any():
if self.training: # this seldomly happen during training, since we pad prediction with gt
# sometimes there is not coarse-level gt at all.
logger.warning("assign a false supervision to avoid ddp deadlock")
correct_mask[0] = True
weight[0] = 0.
else:
return None
# l2 loss with std
offset_l2 = ((expec_f_gt[correct_mask] - expec_f[correct_mask, :2]) ** 2).sum(-1)
loss = (offset_l2 * weight[correct_mask]).mean()
return loss
@torch.no_grad()
def compute_c_weight(self, data):
""" compute element-wise weights for computing coarse-level loss. """
if 'mask0' in data:
c_weight = (data['mask0'].flatten(-2)[..., None] * data['mask1'].flatten(-2)[:, None]).float()
else:
c_weight = None
return c_weight
def forward(self, data):
"""
Update:
data (dict): update{
'loss': [1] the reduced loss across a batch,
'loss_scalars' (dict): loss scalars for tensorboard_record
}
"""
loss_scalars = {}
# 0. compute element-wise loss weight
c_weight = self.compute_c_weight(data)
# 1. coarse-level loss
loss_c = self.compute_coarse_loss(
data['conf_matrix_with_bin'] if self.sparse_spvs and self.match_type == 'sinkhorn' \
else data['conf_matrix'],
data['conf_matrix_gt'],
weight=c_weight)
loss = loss_c * self.loss_config['coarse_weight']
loss_scalars.update({"loss_c": loss_c.clone().detach().cpu()})
# 2. fine-level loss
loss_f = self.compute_fine_loss(data['expec_f'], data['expec_f_gt'])
if loss_f is not None:
loss += loss_f * self.loss_config['fine_weight']
loss_scalars.update({"loss_f": loss_f.clone().detach().cpu()})
else:
assert self.training is False
loss_scalars.update({'loss_f': torch.tensor(1.)}) # 1 is the upper bound
loss_scalars.update({'loss': loss.clone().detach().cpu()})
data.update({"loss": loss, "loss_scalars": loss_scalars})
import torch
from torch.optim.lr_scheduler import MultiStepLR, CosineAnnealingLR, ExponentialLR
def build_optimizer(model, config):
name = config.TRAINER.OPTIMIZER
lr = config.TRAINER.TRUE_LR
if name == "adam":
return torch.optim.Adam(model.parameters(), lr=lr, weight_decay=config.TRAINER.ADAM_DECAY)
elif name == "adamw":
return torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=config.TRAINER.ADAMW_DECAY)
else:
raise ValueError(f"TRAINER.OPTIMIZER = {name} is not a valid optimizer!")
def build_scheduler(config, optimizer):
"""
Returns:
scheduler (dict):{
'scheduler': lr_scheduler,
'interval': 'step', # or 'epoch'
'monitor': 'val_f1', (optional)
'frequency': x, (optional)
}
"""
scheduler = {'interval': config.TRAINER.SCHEDULER_INTERVAL}
name = config.TRAINER.SCHEDULER
if name == 'MultiStepLR':
scheduler.update(
{'scheduler': MultiStepLR(optimizer, config.TRAINER.MSLR_MILESTONES, gamma=config.TRAINER.MSLR_GAMMA)})
elif name == 'CosineAnnealing':
scheduler.update(
{'scheduler': CosineAnnealingLR(optimizer, config.TRAINER.COSA_TMAX)})
elif name == 'ExponentialLR':
scheduler.update(
{'scheduler': ExponentialLR(optimizer, config.TRAINER.ELR_GAMMA)})
else:
raise NotImplementedError()
return scheduler
import albumentations as A
class DarkAug(object):
"""
Extreme dark augmentation aiming at Aachen Day-Night
"""
def __init__(self) -> None:
self.augmentor = A.Compose([
A.RandomBrightnessContrast(p=0.75, brightness_limit=(-0.6, 0.0), contrast_limit=(-0.5, 0.3)),
A.Blur(p=0.1, blur_limit=(3, 9)),
A.MotionBlur(p=0.2, blur_limit=(3, 25)),
A.RandomGamma(p=0.1, gamma_limit=(15, 65)),
A.HueSaturationValue(p=0.1, val_shift_limit=(-100, -40))
], p=0.75)
def __call__(self, x):
return self.augmentor(image=x)['image']
class MobileAug(object):
"""
Random augmentations aiming at images of mobile/handhold devices.
"""
def __init__(self):
self.augmentor = A.Compose([
A.MotionBlur(p=0.25),
A.ColorJitter(p=0.5),
A.RandomRain(p=0.1), # random occlusion
A.RandomSunFlare(p=0.1),
A.JpegCompression(p=0.25),
A.ISONoise(p=0.25)
], p=1.0)
def __call__(self, x):
return self.augmentor(image=x)['image']
def build_augmentor(method=None, **kwargs):
if method is not None:
raise NotImplementedError('Using of augmentation functions are not supported yet!')
if method == 'dark':
return DarkAug()
elif method == 'mobile':
return MobileAug()
elif method is None:
return None
else:
raise ValueError(f'Invalid augmentation method: {method}')
if __name__ == '__main__':
augmentor = build_augmentor('FDA')
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
"""
[Copied from detectron2]
This file contains primitives for multi-gpu communication.
This is useful when doing distributed training.
"""
import functools
import logging
import numpy as np
import pickle
import torch
import torch.distributed as dist
_LOCAL_PROCESS_GROUP = None
"""
A torch process group which only includes processes that on the same machine as the current process.
This variable is set when processes are spawned by `launch()` in "engine/launch.py".
"""
def get_world_size() -> int:
if not dist.is_available():
return 1
if not dist.is_initialized():
return 1
return dist.get_world_size()
def get_rank() -> int:
if not dist.is_available():
return 0
if not dist.is_initialized():
return 0
return dist.get_rank()
def get_local_rank() -> int:
"""
Returns:
The rank of the current process within the local (per-machine) process group.
"""
if not dist.is_available():
return 0
if not dist.is_initialized():
return 0
assert _LOCAL_PROCESS_GROUP is not None
return dist.get_rank(group=_LOCAL_PROCESS_GROUP)
def get_local_size() -> int:
"""
Returns:
The size of the per-machine process group,
i.e. the number of processes per machine.
"""
if not dist.is_available():
return 1
if not dist.is_initialized():
return 1
return dist.get_world_size(group=_LOCAL_PROCESS_GROUP)
def is_main_process() -> bool:
return get_rank() == 0
def synchronize():
"""
Helper function to synchronize (barrier) among all processes when
using distributed training
"""
if not dist.is_available():
return
if not dist.is_initialized():
return
world_size = dist.get_world_size()
if world_size == 1:
return
dist.barrier()
@functools.lru_cache()
def _get_global_gloo_group():
"""
Return a process group based on gloo backend, containing all the ranks
The result is cached.
"""
if dist.get_backend() == "nccl":
return dist.new_group(backend="gloo")
else:
return dist.group.WORLD
def _serialize_to_tensor(data, group):
backend = dist.get_backend(group)
assert backend in ["gloo", "nccl"]
device = torch.device("cpu" if backend == "gloo" else "cuda")
buffer = pickle.dumps(data)
if len(buffer) > 1024 ** 3:
logger = logging.getLogger(__name__)
logger.warning(
"Rank {} trying to all-gather {:.2f} GB of data on device {}".format(
get_rank(), len(buffer) / (1024 ** 3), device
)
)
storage = torch.ByteStorage.from_buffer(buffer)
tensor = torch.ByteTensor(storage).to(device=device)
return tensor
def _pad_to_largest_tensor(tensor, group):
"""
Returns:
list[int]: size of the tensor, on each rank
Tensor: padded tensor that has the max size
"""
world_size = dist.get_world_size(group=group)
assert (
world_size >= 1
), "comm.gather/all_gather must be called from ranks within the given group!"
local_size = torch.tensor([tensor.numel()], dtype=torch.int64, device=tensor.device)
size_list = [
torch.zeros([1], dtype=torch.int64, device=tensor.device) for _ in range(world_size)
]
dist.all_gather(size_list, local_size, group=group)
size_list = [int(size.item()) for size in size_list]
max_size = max(size_list)
# we pad the tensor because torch all_gather does not support
# gathering tensors of different shapes
if local_size != max_size:
padding = torch.zeros((max_size - local_size,), dtype=torch.uint8, device=tensor.device)
tensor = torch.cat((tensor, padding), dim=0)
return size_list, tensor
def all_gather(data, group=None):
"""
Run all_gather on arbitrary picklable data (not necessarily tensors).
Args:
data: any picklable object
group: a torch process group. By default, will use a group which
contains all ranks on gloo backend.
Returns:
list[data]: list of data gathered from each rank
"""
if get_world_size() == 1:
return [data]
if group is None:
group = _get_global_gloo_group()
if dist.get_world_size(group) == 1:
return [data]
tensor = _serialize_to_tensor(data, group)
size_list, tensor = _pad_to_largest_tensor(tensor, group)
max_size = max(size_list)
# receiving Tensor from all ranks
tensor_list = [
torch.empty((max_size,), dtype=torch.uint8, device=tensor.device) for _ in size_list
]
dist.all_gather(tensor_list, tensor, group=group)
data_list = []
for size, tensor in zip(size_list, tensor_list):
buffer = tensor.cpu().numpy().tobytes()[:size]
data_list.append(pickle.loads(buffer))
return data_list
def gather(data, dst=0, group=None):
"""
Run gather on arbitrary picklable data (not necessarily tensors).
Args:
data: any picklable object
dst (int): destination rank
group: a torch process group. By default, will use a group which
contains all ranks on gloo backend.
Returns:
list[data]: on dst, a list of data gathered from each rank. Otherwise,
an empty list.
"""
if get_world_size() == 1:
return [data]
if group is None:
group = _get_global_gloo_group()
if dist.get_world_size(group=group) == 1:
return [data]
rank = dist.get_rank(group=group)
tensor = _serialize_to_tensor(data, group)
size_list, tensor = _pad_to_largest_tensor(tensor, group)
# receiving Tensor from all ranks
if rank == dst:
max_size = max(size_list)
tensor_list = [
torch.empty((max_size,), dtype=torch.uint8, device=tensor.device) for _ in size_list
]
dist.gather(tensor, tensor_list, dst=dst, group=group)
data_list = []
for size, tensor in zip(size_list, tensor_list):
buffer = tensor.cpu().numpy().tobytes()[:size]
data_list.append(pickle.loads(buffer))
return data_list
else:
dist.gather(tensor, [], dst=dst, group=group)
return []
def shared_random_seed():
"""
Returns:
int: a random number that is the same across all workers.
If workers need a shared RNG, they can use this shared seed to
create one.
All workers must call this function, otherwise it will deadlock.
"""
ints = np.random.randint(2 ** 31)
all_ints = all_gather(ints)
return all_ints[0]
def reduce_dict(input_dict, average=True):
"""
Reduce the values in the dictionary from all processes so that process with rank
0 has the reduced results.
Args:
input_dict (dict): inputs to be reduced. All the values must be scalar CUDA Tensor.
average (bool): whether to do average or sum
Returns:
a dict with the same keys as input_dict, after reduction.
"""
world_size = get_world_size()
if world_size < 2:
return input_dict
with torch.no_grad():
names = []
values = []
# sort the keys so that they are consistent across processes
for k in sorted(input_dict.keys()):
names.append(k)
values.append(input_dict[k])
values = torch.stack(values, dim=0)
dist.reduce(values, dst=0)
if dist.get_rank() == 0 and average:
# only main process gets accumulated, so only divide by
# world_size in this case
values /= world_size
reduced_dict = {k: v for k, v in zip(names, values)}
return reduced_dict
import numpy as np
# --- PL-DATAMODULE ---
def get_local_split(items: list, world_size: int, rank: int, seed: int):
""" The local rank only loads a split of the dataset. """
n_items = len(items)
items_permute = np.random.RandomState(seed).permutation(items)
if n_items % world_size == 0:
padded_items = items_permute
else:
padding = np.random.RandomState(seed).choice(
items,
world_size - (n_items % world_size),
replace=True)
padded_items = np.concatenate([items_permute, padding])
assert len(padded_items) % world_size == 0, \
f'len(padded_items): {len(padded_items)}; world_size: {world_size}; len(padding): {len(padding)}'
n_per_rank = len(padded_items) // world_size
local_items = padded_items[n_per_rank * rank: n_per_rank * (rank+1)]
return local_items
import io
from loguru import logger
import cv2
import numpy as np
import h5py
import torch
from numpy.linalg import inv
try:
# for internel use only
from .client import MEGADEPTH_CLIENT, SCANNET_CLIENT
except Exception:
MEGADEPTH_CLIENT = SCANNET_CLIENT = None
# --- DATA IO ---
def load_array_from_s3(
path, client, cv_type,
use_h5py=False,
):
byte_str = client.Get(path)
try:
if not use_h5py:
raw_array = np.fromstring(byte_str, np.uint8)
data = cv2.imdecode(raw_array, cv_type)
else:
f = io.BytesIO(byte_str)
data = np.array(h5py.File(f, 'r')['/depth'])
except Exception as ex:
print(f"==> Data loading failure: {path}")
raise ex
assert data is not None
return data
def imread_gray(path, augment_fn=None, client=SCANNET_CLIENT):
cv_type = cv2.IMREAD_GRAYSCALE if augment_fn is None \
else cv2.IMREAD_COLOR
if str(path).startswith('s3://'):
image = load_array_from_s3(str(path), client, cv_type)
else:
image = cv2.imread(str(path), cv_type)
if augment_fn is not None:
image = cv2.imread(str(path), cv2.IMREAD_COLOR)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = augment_fn(image)
image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
return image # (h, w)
def get_resized_wh(w, h, resize=None):
if resize is not None: # resize the longer edge
scale = resize / max(h, w)
w_new, h_new = int(round(w*scale)), int(round(h*scale))
else:
w_new, h_new = w, h
return w_new, h_new
def get_divisible_wh(w, h, df=None):
if df is not None:
w_new, h_new = map(lambda x: int(x // df * df), [w, h])
else:
w_new, h_new = w, h
return w_new, h_new
def pad_bottom_right(inp, pad_size, ret_mask=False):
assert isinstance(pad_size, int) and pad_size >= max(inp.shape[-2:]), f"{pad_size} < {max(inp.shape[-2:])}"
mask = None
if inp.ndim == 2:
padded = np.zeros((pad_size, pad_size), dtype=inp.dtype)
padded[:inp.shape[0], :inp.shape[1]] = inp
if ret_mask:
mask = np.zeros((pad_size, pad_size), dtype=bool)
mask[:inp.shape[0], :inp.shape[1]] = True
elif inp.ndim == 3:
padded = np.zeros((inp.shape[0], pad_size, pad_size), dtype=inp.dtype)
padded[:, :inp.shape[1], :inp.shape[2]] = inp
if ret_mask:
mask = np.zeros((inp.shape[0], pad_size, pad_size), dtype=bool)
mask[:, :inp.shape[1], :inp.shape[2]] = True
else:
raise NotImplementedError()
return padded, mask
# --- MEGADEPTH ---
def read_megadepth_gray(path, resize=None, df=None, padding=False, augment_fn=None):
"""
Args:
resize (int, optional): the longer edge of resized images. None for no resize.
padding (bool): If set to 'True', zero-pad resized images to squared size.
augment_fn (callable, optional): augments images with pre-defined visual effects
Returns:
image (torch.tensor): (1, h, w)
mask (torch.tensor): (h, w)
scale (torch.tensor): [w/w_new, h/h_new]
"""
# read image
image = imread_gray(path, augment_fn, client=MEGADEPTH_CLIENT)
# resize image
w, h = image.shape[1], image.shape[0]
w_new, h_new = get_resized_wh(w, h, resize)
w_new, h_new = get_divisible_wh(w_new, h_new, df)
image = cv2.resize(image, (w_new, h_new))
scale = torch.tensor([w/w_new, h/h_new], dtype=torch.float)
if padding: # padding
pad_to = max(h_new, w_new)
image, mask = pad_bottom_right(image, pad_to, ret_mask=True)
else:
mask = None
image = torch.from_numpy(image).float()[None] / 255 # (h, w) -> (1, h, w) and normalized
mask = torch.from_numpy(mask)
return image, mask, scale
def read_megadepth_depth(path, pad_to=None):
if str(path).startswith('s3://'):
depth = load_array_from_s3(path, MEGADEPTH_CLIENT, None, use_h5py=True)
else:
depth = np.array(h5py.File(path, 'r')['depth'])
if pad_to is not None:
depth, _ = pad_bottom_right(depth, pad_to, ret_mask=False)
depth = torch.from_numpy(depth).float() # (h, w)
return depth
# --- ScanNet ---
def read_scannet_gray(path, resize=(640, 480), augment_fn=None):
"""
Args:
resize (tuple): align image to depthmap, in (w, h).
augment_fn (callable, optional): augments images with pre-defined visual effects
Returns:
image (torch.tensor): (1, h, w)
mask (torch.tensor): (h, w)
scale (torch.tensor): [w/w_new, h/h_new]
"""
# read and resize image
image = imread_gray(path, augment_fn)
image = cv2.resize(image, resize)
# (h, w) -> (1, h, w) and normalized
image = torch.from_numpy(image).float()[None] / 255
return image
def read_scannet_depth(path):
if str(path).startswith('s3://'):
depth = load_array_from_s3(str(path), SCANNET_CLIENT, cv2.IMREAD_UNCHANGED)
else:
depth = cv2.imread(str(path), cv2.IMREAD_UNCHANGED)
depth = depth / 1000
depth = torch.from_numpy(depth).float() # (h, w)
return depth
def read_scannet_pose(path):
""" Read ScanNet's Camera2World pose and transform it to World2Camera.
Returns:
pose_w2c (np.ndarray): (4, 4)
"""
cam2world = np.loadtxt(path, delimiter=' ')
world2cam = inv(cam2world)
return world2cam
def read_scannet_intrinsic(path):
""" Read ScanNet's intrinsic matrix and return the 3x3 matrix.
"""
intrinsic = np.loadtxt(path, delimiter=' ')
return intrinsic[:-1, :-1]
import torch
import cv2
import numpy as np
from collections import OrderedDict
from loguru import logger
from kornia.geometry.epipolar import numeric
from kornia.geometry.conversions import convert_points_to_homogeneous
# --- METRICS ---
def relative_pose_error(T_0to1, R, t, ignore_gt_t_thr=0.0):
# angle error between 2 vectors
t_gt = T_0to1[:3, 3]
n = np.linalg.norm(t) * np.linalg.norm(t_gt)
t_err = np.rad2deg(np.arccos(np.clip(np.dot(t, t_gt) / n, -1.0, 1.0)))
t_err = np.minimum(t_err, 180 - t_err) # handle E ambiguity
if np.linalg.norm(t_gt) < ignore_gt_t_thr: # pure rotation is challenging
t_err = 0
# angle error between 2 rotation matrices
R_gt = T_0to1[:3, :3]
cos = (np.trace(np.dot(R.T, R_gt)) - 1) / 2
cos = np.clip(cos, -1., 1.) # handle numercial errors
R_err = np.rad2deg(np.abs(np.arccos(cos)))
return t_err, R_err
def symmetric_epipolar_distance(pts0, pts1, E, K0, K1):
"""Squared symmetric epipolar distance.
This can be seen as a biased estimation of the reprojection error.
Args:
pts0 (torch.Tensor): [N, 2]
E (torch.Tensor): [3, 3]
"""
pts0 = (pts0 - K0[[0, 1], [2, 2]][None]) / K0[[0, 1], [0, 1]][None]
pts1 = (pts1 - K1[[0, 1], [2, 2]][None]) / K1[[0, 1], [0, 1]][None]
pts0 = convert_points_to_homogeneous(pts0)
pts1 = convert_points_to_homogeneous(pts1)
Ep0 = pts0 @ E.T # [N, 3]
p1Ep0 = torch.sum(pts1 * Ep0, -1) # [N,]
Etp1 = pts1 @ E # [N, 3]
d = p1Ep0**2 * (1.0 / (Ep0[:, 0]**2 + Ep0[:, 1]**2) + 1.0 / (Etp1[:, 0]**2 + Etp1[:, 1]**2)) # N
return d
def compute_symmetrical_epipolar_errors(data):
"""
Update:
data (dict):{"epi_errs": [M]}
"""
Tx = numeric.cross_product_matrix(data['T_0to1'][:, :3, 3])
E_mat = Tx @ data['T_0to1'][:, :3, :3]
m_bids = data['m_bids']
pts0 = data['mkpts0_f']
pts1 = data['mkpts1_f']
epi_errs = []
for bs in range(Tx.size(0)):
mask = m_bids == bs
epi_errs.append(
symmetric_epipolar_distance(pts0[mask], pts1[mask], E_mat[bs], data['K0'][bs], data['K1'][bs]))
epi_errs = torch.cat(epi_errs, dim=0)
data.update({'epi_errs': epi_errs})
def estimate_pose(kpts0, kpts1, K0, K1, thresh, conf=0.99999):
if len(kpts0) < 5:
return None
# normalize keypoints
kpts0 = (kpts0 - K0[[0, 1], [2, 2]][None]) / K0[[0, 1], [0, 1]][None]
kpts1 = (kpts1 - K1[[0, 1], [2, 2]][None]) / K1[[0, 1], [0, 1]][None]
# normalize ransac threshold
ransac_thr = thresh / np.mean([K0[0, 0], K1[1, 1], K0[0, 0], K1[1, 1]])
# compute pose with cv2
E, mask = cv2.findEssentialMat(
kpts0, kpts1, np.eye(3), threshold=ransac_thr, prob=conf, method=cv2.RANSAC)
if E is None:
print("\nE is None while trying to recover pose.\n")
return None
# recover pose from E
best_num_inliers = 0
ret = None
for _E in np.split(E, len(E) / 3):
n, R, t, _ = cv2.recoverPose(_E, kpts0, kpts1, np.eye(3), 1e9, mask=mask)
if n > best_num_inliers:
ret = (R, t[:, 0], mask.ravel() > 0)
best_num_inliers = n
return ret
def compute_pose_errors(data, config):
"""
Update:
data (dict):{
"R_errs" List[float]: [N]
"t_errs" List[float]: [N]
"inliers" List[np.ndarray]: [N]
}
"""
pixel_thr = config.TRAINER.RANSAC_PIXEL_THR # 0.5
conf = config.TRAINER.RANSAC_CONF # 0.99999
data.update({'R_errs': [], 't_errs': [], 'inliers': []})
m_bids = data['m_bids'].cpu().numpy()
pts0 = data['mkpts0_f'].cpu().numpy()
pts1 = data['mkpts1_f'].cpu().numpy()
K0 = data['K0'].cpu().numpy()
K1 = data['K1'].cpu().numpy()
T_0to1 = data['T_0to1'].cpu().numpy()
for bs in range(K0.shape[0]):
mask = m_bids == bs
ret = estimate_pose(pts0[mask], pts1[mask], K0[bs], K1[bs], pixel_thr, conf=conf)
if ret is None:
data['R_errs'].append(np.inf)
data['t_errs'].append(np.inf)
data['inliers'].append(np.array([]).astype(np.bool))
else:
R, t, inliers = ret
t_err, R_err = relative_pose_error(T_0to1[bs], R, t, ignore_gt_t_thr=0.0)
data['R_errs'].append(R_err)
data['t_errs'].append(t_err)
data['inliers'].append(inliers)
# --- METRIC AGGREGATION ---
def error_auc(errors, thresholds):
"""
Args:
errors (list): [N,]
thresholds (list)
"""
errors = [0] + sorted(list(errors))
recall = list(np.linspace(0, 1, len(errors)))
aucs = []
thresholds = [5, 10, 20]
for thr in thresholds:
last_index = np.searchsorted(errors, thr)
y = recall[:last_index] + [recall[last_index-1]]
x = errors[:last_index] + [thr]
aucs.append(np.trapz(y, x) / thr)
return {f'auc@{t}': auc for t, auc in zip(thresholds, aucs)}
def epidist_prec(errors, thresholds, ret_dict=False):
precs = []
for thr in thresholds:
prec_ = []
for errs in errors:
correct_mask = errs < thr
prec_.append(np.mean(correct_mask) if len(correct_mask) > 0 else 0)
precs.append(np.mean(prec_) if len(prec_) > 0 else 0)
if ret_dict:
return {f'prec@{t:.0e}': prec for t, prec in zip(thresholds, precs)}
else:
return precs
def aggregate_metrics(metrics, epi_err_thr=5e-4):
""" Aggregate metrics for the whole dataset:
(This method should be called once per dataset)
1. AUC of the pose error (angular) at the threshold [5, 10, 20]
2. Mean matching precision at the threshold 5e-4(ScanNet), 1e-4(MegaDepth)
"""
# filter duplicates
unq_ids = OrderedDict((iden, id) for id, iden in enumerate(metrics['identifiers']))
unq_ids = list(unq_ids.values())
logger.info(f'Aggregating metrics over {len(unq_ids)} unique items...')
# pose auc
angular_thresholds = [5, 10, 20]
pose_errors = np.max(np.stack([metrics['R_errs'], metrics['t_errs']]), axis=0)[unq_ids]
aucs = error_auc(pose_errors, angular_thresholds) # (auc@5, auc@10, auc@20)
# matching precision
dist_thresholds = [epi_err_thr]
precs = epidist_prec(np.array(metrics['epi_errs'], dtype=object)[unq_ids], dist_thresholds, True) # (prec@err_thr)
return {**aucs, **precs}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment